diff options
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 164 |
1 files changed, 140 insertions, 24 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 50101906..5a14479a 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -4,6 +4,7 @@ from __future__ import print_function import logging import os +import re import shutil import stat import tempfile @@ -23,6 +24,7 @@ except ImportError: BASH = util.which('bash') +BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name' class FakeSelinux(object): @@ -265,26 +267,49 @@ class TestGetCmdline(helpers.TestCase): self.assertEqual("abcd 123", ret) -class TestLoadYaml(helpers.TestCase): +class TestLoadYaml(helpers.CiTestCase): mydefault = "7b03a8ebace993d806255121073fed52" + with_logs = True def test_simple(self): mydata = {'1': "one", '2': "two"} self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) def test_nonallowed_returns_default(self): + '''Any unallowed types result in returning default; log the issue.''' # for now, anything not in the allowed list just returns the default. myyaml = yaml.dump({'1': "one"}) self.assertEqual(util.load_yaml(blob=myyaml, default=self.mydefault, allowed=(str,)), self.mydefault) - - def test_bogus_returns_default(self): + regex = re.compile( + r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but' + r' got dict') + self.assertTrue(regex.search(self.logs.getvalue()), + msg='Missing expected yaml load error') + + def test_bogus_scan_error_returns_default(self): + '''On Yaml scan error, load_yaml returns the default and logs issue.''' badyaml = "1\n 2:" self.assertEqual(util.load_yaml(blob=badyaml, default=self.mydefault), self.mydefault) + self.assertIn( + 'Failed loading yaml blob. Invalid format at line 2 column 3:' + ' "mapping values are not allowed here', + self.logs.getvalue()) + + def test_bogus_parse_error_returns_default(self): + '''On Yaml parse error, load_yaml returns default and logs issue.''' + badyaml = "{}}" + self.assertEqual(util.load_yaml(blob=badyaml, + default=self.mydefault), + self.mydefault) + self.assertIn( + 'Failed loading yaml blob. Invalid format at line 1 column 3:' + " \"expected \'<document start>\', but found \'}\'", + self.logs.getvalue()) def test_unsafe_types(self): # should not load complex types @@ -325,7 +350,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): def test_precise_ext4_root(self): - lines = self.readResource('mountinfo_precise_ext4.txt').splitlines() + lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines() expected = ('/dev/mapper/vg0-root', 'ext4', '/') self.assertEqual(expected, util.parse_mount_info('/', lines)) @@ -347,7 +372,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) def test_raring_btrfs_root(self): - lines = self.readResource('mountinfo_raring_btrfs.txt').splitlines() + lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines() expected = ('/dev/vda1', 'btrfs', '/') self.assertEqual(expected, util.parse_mount_info('/', lines)) @@ -373,7 +398,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - self.readResource('zpool_status_simple.txt'), '' + helpers.readResource('zpool_status_simple.txt'), '' ) # save function return values and do asserts ret = util.get_device_info_from_zpool('vmzroot') @@ -406,7 +431,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - self.readResource('zpool_status_simple.txt'), 'error' + helpers.readResource('zpool_status_simple.txt'), 'error' ) # save function return values and do asserts ret = util.get_device_info_from_zpool('vmzroot') @@ -414,7 +439,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): @mock.patch('cloudinit.util.subp') def test_parse_mount_with_ext(self, mount_out): - mount_out.return_value = (self.readResource('mount_parse_ext.txt'), '') + mount_out.return_value = ( + helpers.readResource('mount_parse_ext.txt'), '') # this one is valid and exists in mount_parse_ext.txt ret = util.parse_mount('/var') self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret) @@ -430,7 +456,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): @mock.patch('cloudinit.util.subp') def test_parse_mount_with_zfs(self, mount_out): - mount_out.return_value = (self.readResource('mount_parse_zfs.txt'), '') + mount_out.return_value = ( + helpers.readResource('mount_parse_zfs.txt'), '') # this one is valid and exists in mount_parse_zfs.txt ret = util.parse_mount('/var') self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret) @@ -442,6 +469,29 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertIsNone(ret) +class TestIsX86(helpers.CiTestCase): + + def test_is_x86_matches_x86_types(self): + """is_x86 returns True if CPU architecture matches.""" + matched_arches = ['x86_64', 'i386', 'i586', 'i686'] + for arch in matched_arches: + self.assertTrue( + util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch) + + def test_is_x86_unmatched_types(self): + """is_x86 returns Fale on non-intel x86 architectures.""" + unmatched_arches = ['ia64', '9000/800', 'arm64v71'] + for arch in unmatched_arches: + self.assertFalse( + util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch) + + @mock.patch('cloudinit.util.os.uname') + def test_is_x86_calls_uname_for_architecture(self, m_uname): + """is_x86 returns True if platform from uname matches.""" + m_uname.return_value = [0, 1, 2, 3, 'x86_64'] + self.assertTrue(util.is_x86()) + + class TestReadDMIData(helpers.FilesystemMockingTestCase): def setUp(self): @@ -693,6 +743,8 @@ class TestReadSeeded(helpers.TestCase): class TestSubp(helpers.CiTestCase): with_logs = True + allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE, + BOGUS_COMMAND, sys.executable] stdin2err = [BASH, '-c', 'cat >&2'] stdin2out = ['cat'] @@ -700,7 +752,6 @@ class TestSubp(helpers.CiTestCase): utf8_valid = b'start \xc3\xa9 end' utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--'] - bogus_command = 'this-is-not-expected-to-be-a-program-name' def printf_cmd(self, *args): # bash's printf supports \xaa. So does /usr/bin/printf @@ -772,11 +823,11 @@ class TestSubp(helpers.CiTestCase): def test_subp_reads_env(self): with mock.patch.dict("os.environ", values={'FOO': 'BAR'}): - out, err = util.subp(self.printenv + ['FOO'], capture=True) + out, _err = util.subp(self.printenv + ['FOO'], capture=True) self.assertEqual('FOO=BAR', out.splitlines()[0]) def test_subp_env_and_update_env(self): - out, err = util.subp( + out, _err = util.subp( self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, env={'FOO': 'BAR'}, update_env={'HOME': '/myhome', 'K2': 'V2'}) @@ -786,7 +837,7 @@ class TestSubp(helpers.CiTestCase): def test_subp_update_env(self): extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'} with mock.patch.dict("os.environ", values=extra): - out, err = util.subp( + out, _err = util.subp( self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, update_env={'HOME': '/myhome', 'K2': 'V2'}) @@ -799,9 +850,18 @@ class TestSubp(helpers.CiTestCase): util.write_file(noshebang, 'true\n') os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) - self.assertRaisesRegex(util.ProcessExecutionError, - 'Missing #! in script\?', - util.subp, (noshebang,)) + with self.allow_subp([noshebang]): + self.assertRaisesRegex(util.ProcessExecutionError, + r'Missing #! in script\?', + util.subp, (noshebang,)) + + def test_subp_combined_stderr_stdout(self): + """Providing combine_capture as True redirects stderr to stdout.""" + data = b'hello world' + (out, err) = util.subp(self.stdin2err, capture=True, + combine_capture=True, decode=False, data=data) + self.assertEqual(b'', err) + self.assertEqual(data, out) def test_returns_none_if_no_capture(self): (out, err) = util.subp(self.stdin2out, data=b'', capture=False) @@ -811,14 +871,14 @@ class TestSubp(helpers.CiTestCase): def test_exception_has_out_err_are_bytes_if_decode_false(self): """Raised exc should have stderr, stdout as bytes if no decode.""" with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([self.bogus_command], decode=False) + util.subp([BOGUS_COMMAND], decode=False) self.assertTrue(isinstance(cm.exception.stdout, bytes)) self.assertTrue(isinstance(cm.exception.stderr, bytes)) def test_exception_has_out_err_are_bytes_if_decode_true(self): """Raised exc should have stderr, stdout as string if no decode.""" with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([self.bogus_command], decode=True) + util.subp([BOGUS_COMMAND], decode=True) self.assertTrue(isinstance(cm.exception.stdout, six.string_types)) self.assertTrue(isinstance(cm.exception.stderr, six.string_types)) @@ -868,10 +928,10 @@ class TestSubp(helpers.CiTestCase): logs.append(log) with self.assertRaises(util.ProcessExecutionError): - util.subp([self.bogus_command], status_cb=status_cb) + util.subp([BOGUS_COMMAND], status_cb=status_cb) expected = [ - 'Begin run command: {cmd}\n'.format(cmd=self.bogus_command), + 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND), 'ERROR: End run command: invalid command provided\n'] self.assertEqual(expected, logs) @@ -883,13 +943,13 @@ class TestSubp(helpers.CiTestCase): logs.append(log) with self.assertRaises(util.ProcessExecutionError): - util.subp(['ls', '/I/dont/exist'], status_cb=status_cb) - util.subp(['ls'], status_cb=status_cb) + util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb) + util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb) expected = [ - 'Begin run command: ls /I/dont/exist\n', + 'Begin run command: %s -c exit 2\n' % BASH, 'ERROR: End run command: exit(2)\n', - 'Begin run command: ls\n', + 'Begin run command: %s -c exit 0\n' % BASH, 'End run command: exit(0)\n'] self.assertEqual(expected, logs) @@ -1055,4 +1115,60 @@ class TestLoadShellContent(helpers.TestCase): '']))) +class TestGetProcEnv(helpers.TestCase): + """test get_proc_env.""" + null = b'\x00' + simple1 = b'HOME=/' + simple2 = b'PATH=/bin:/sbin' + bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371 + mixed = b'MIXED=' + b'ab\xccde' + + def _val_decoded(self, blob, encoding='utf-8', errors='replace'): + # return the value portion of key=val decoded. + return blob.split(b'=', 1)[1].decode(encoding, errors) + + @mock.patch("cloudinit.util.load_file") + def test_non_utf8_in_environment(self, m_load_file): + """env may have non utf-8 decodable content.""" + content = self.null.join( + (self.bootflag, self.simple1, self.simple2, self.mixed)) + m_load_file.return_value = content + + self.assertEqual( + {'BOOTABLE_FLAG': self._val_decoded(self.bootflag), + 'HOME': '/', 'PATH': '/bin:/sbin', + 'MIXED': self._val_decoded(self.mixed)}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_encoding_none_returns_bytes(self, m_load_file): + """encoding none returns bytes.""" + lines = (self.bootflag, self.simple1, self.simple2, self.mixed) + content = self.null.join(lines) + m_load_file.return_value = content + + self.assertEqual( + dict([t.split(b'=') for t in lines]), + util.get_proc_env(1, encoding=None)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_all_utf8_encoded(self, m_load_file): + """common path where only utf-8 decodable content.""" + content = self.null.join((self.simple1, self.simple2)) + m_load_file.return_value = content + self.assertEqual( + {'HOME': '/', 'PATH': '/bin:/sbin'}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_non_existing_file_returns_empty_dict(self, m_load_file): + """as implemented, a non-existing pid returns empty dict. + This is how it was originally implemented.""" + m_load_file.side_effect = OSError("File does not exist.") + self.assertEqual({}, util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + # vi: ts=4 expandtab |