summaryrefslogtreecommitdiff
path: root/tests/unittests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r--tests/unittests/test_util.py139
1 files changed, 126 insertions, 13 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 50101906..7a203ce2 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -4,6 +4,7 @@ from __future__ import print_function
import logging
import os
+import re
import shutil
import stat
import tempfile
@@ -265,26 +266,49 @@ class TestGetCmdline(helpers.TestCase):
self.assertEqual("abcd 123", ret)
-class TestLoadYaml(helpers.TestCase):
+class TestLoadYaml(helpers.CiTestCase):
mydefault = "7b03a8ebace993d806255121073fed52"
+ with_logs = True
def test_simple(self):
mydata = {'1': "one", '2': "two"}
self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
def test_nonallowed_returns_default(self):
+ '''Any unallowed types result in returning default; log the issue.'''
# for now, anything not in the allowed list just returns the default.
myyaml = yaml.dump({'1': "one"})
self.assertEqual(util.load_yaml(blob=myyaml,
default=self.mydefault,
allowed=(str,)),
self.mydefault)
-
- def test_bogus_returns_default(self):
+ regex = re.compile(
+ r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but'
+ r' got dict')
+ self.assertTrue(regex.search(self.logs.getvalue()),
+ msg='Missing expected yaml load error')
+
+ def test_bogus_scan_error_returns_default(self):
+ '''On Yaml scan error, load_yaml returns the default and logs issue.'''
badyaml = "1\n 2:"
self.assertEqual(util.load_yaml(blob=badyaml,
default=self.mydefault),
self.mydefault)
+ self.assertIn(
+ 'Failed loading yaml blob. Invalid format at line 2 column 3:'
+ ' "mapping values are not allowed here',
+ self.logs.getvalue())
+
+ def test_bogus_parse_error_returns_default(self):
+ '''On Yaml parse error, load_yaml returns default and logs issue.'''
+ badyaml = "{}}"
+ self.assertEqual(util.load_yaml(blob=badyaml,
+ default=self.mydefault),
+ self.mydefault)
+ self.assertIn(
+ 'Failed loading yaml blob. Invalid format at line 1 column 3:'
+ " \"expected \'<document start>\', but found \'}\'",
+ self.logs.getvalue())
def test_unsafe_types(self):
# should not load complex types
@@ -325,7 +349,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
def test_precise_ext4_root(self):
- lines = self.readResource('mountinfo_precise_ext4.txt').splitlines()
+ lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines()
expected = ('/dev/mapper/vg0-root', 'ext4', '/')
self.assertEqual(expected, util.parse_mount_info('/', lines))
@@ -347,7 +371,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
def test_raring_btrfs_root(self):
- lines = self.readResource('mountinfo_raring_btrfs.txt').splitlines()
+ lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines()
expected = ('/dev/vda1', 'btrfs', '/')
self.assertEqual(expected, util.parse_mount_info('/', lines))
@@ -373,7 +397,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
m_os.path.exists.return_value = True
# mock subp command from util.get_mount_info_fs_on_zpool
zpool_output.return_value = (
- self.readResource('zpool_status_simple.txt'), ''
+ helpers.readResource('zpool_status_simple.txt'), ''
)
# save function return values and do asserts
ret = util.get_device_info_from_zpool('vmzroot')
@@ -406,7 +430,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
m_os.path.exists.return_value = True
# mock subp command from util.get_mount_info_fs_on_zpool
zpool_output.return_value = (
- self.readResource('zpool_status_simple.txt'), 'error'
+ helpers.readResource('zpool_status_simple.txt'), 'error'
)
# save function return values and do asserts
ret = util.get_device_info_from_zpool('vmzroot')
@@ -414,7 +438,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
@mock.patch('cloudinit.util.subp')
def test_parse_mount_with_ext(self, mount_out):
- mount_out.return_value = (self.readResource('mount_parse_ext.txt'), '')
+ mount_out.return_value = (
+ helpers.readResource('mount_parse_ext.txt'), '')
# this one is valid and exists in mount_parse_ext.txt
ret = util.parse_mount('/var')
self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret)
@@ -430,7 +455,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
@mock.patch('cloudinit.util.subp')
def test_parse_mount_with_zfs(self, mount_out):
- mount_out.return_value = (self.readResource('mount_parse_zfs.txt'), '')
+ mount_out.return_value = (
+ helpers.readResource('mount_parse_zfs.txt'), '')
# this one is valid and exists in mount_parse_zfs.txt
ret = util.parse_mount('/var')
self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret)
@@ -442,6 +468,29 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
self.assertIsNone(ret)
+class TestIsX86(helpers.CiTestCase):
+
+ def test_is_x86_matches_x86_types(self):
+ """is_x86 returns True if CPU architecture matches."""
+ matched_arches = ['x86_64', 'i386', 'i586', 'i686']
+ for arch in matched_arches:
+ self.assertTrue(
+ util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch)
+
+ def test_is_x86_unmatched_types(self):
+ """is_x86 returns Fale on non-intel x86 architectures."""
+ unmatched_arches = ['ia64', '9000/800', 'arm64v71']
+ for arch in unmatched_arches:
+ self.assertFalse(
+ util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch)
+
+ @mock.patch('cloudinit.util.os.uname')
+ def test_is_x86_calls_uname_for_architecture(self, m_uname):
+ """is_x86 returns True if platform from uname matches."""
+ m_uname.return_value = [0, 1, 2, 3, 'x86_64']
+ self.assertTrue(util.is_x86())
+
+
class TestReadDMIData(helpers.FilesystemMockingTestCase):
def setUp(self):
@@ -772,11 +821,11 @@ class TestSubp(helpers.CiTestCase):
def test_subp_reads_env(self):
with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
- out, err = util.subp(self.printenv + ['FOO'], capture=True)
+ out, _err = util.subp(self.printenv + ['FOO'], capture=True)
self.assertEqual('FOO=BAR', out.splitlines()[0])
def test_subp_env_and_update_env(self):
- out, err = util.subp(
+ out, _err = util.subp(
self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
env={'FOO': 'BAR'},
update_env={'HOME': '/myhome', 'K2': 'V2'})
@@ -786,7 +835,7 @@ class TestSubp(helpers.CiTestCase):
def test_subp_update_env(self):
extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
with mock.patch.dict("os.environ", values=extra):
- out, err = util.subp(
+ out, _err = util.subp(
self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
update_env={'HOME': '/myhome', 'K2': 'V2'})
@@ -800,9 +849,17 @@ class TestSubp(helpers.CiTestCase):
os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
self.assertRaisesRegex(util.ProcessExecutionError,
- 'Missing #! in script\?',
+ r'Missing #! in script\?',
util.subp, (noshebang,))
+ def test_subp_combined_stderr_stdout(self):
+ """Providing combine_capture as True redirects stderr to stdout."""
+ data = b'hello world'
+ (out, err) = util.subp(self.stdin2err, capture=True,
+ combine_capture=True, decode=False, data=data)
+ self.assertEqual(b'', err)
+ self.assertEqual(data, out)
+
def test_returns_none_if_no_capture(self):
(out, err) = util.subp(self.stdin2out, data=b'', capture=False)
self.assertIsNone(err)
@@ -1055,4 +1112,60 @@ class TestLoadShellContent(helpers.TestCase):
''])))
+class TestGetProcEnv(helpers.TestCase):
+ """test get_proc_env."""
+ null = b'\x00'
+ simple1 = b'HOME=/'
+ simple2 = b'PATH=/bin:/sbin'
+ bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371
+ mixed = b'MIXED=' + b'ab\xccde'
+
+ def _val_decoded(self, blob, encoding='utf-8', errors='replace'):
+ # return the value portion of key=val decoded.
+ return blob.split(b'=', 1)[1].decode(encoding, errors)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_non_utf8_in_environment(self, m_load_file):
+ """env may have non utf-8 decodable content."""
+ content = self.null.join(
+ (self.bootflag, self.simple1, self.simple2, self.mixed))
+ m_load_file.return_value = content
+
+ self.assertEqual(
+ {'BOOTABLE_FLAG': self._val_decoded(self.bootflag),
+ 'HOME': '/', 'PATH': '/bin:/sbin',
+ 'MIXED': self._val_decoded(self.mixed)},
+ util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_encoding_none_returns_bytes(self, m_load_file):
+ """encoding none returns bytes."""
+ lines = (self.bootflag, self.simple1, self.simple2, self.mixed)
+ content = self.null.join(lines)
+ m_load_file.return_value = content
+
+ self.assertEqual(
+ dict([t.split(b'=') for t in lines]),
+ util.get_proc_env(1, encoding=None))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_all_utf8_encoded(self, m_load_file):
+ """common path where only utf-8 decodable content."""
+ content = self.null.join((self.simple1, self.simple2))
+ m_load_file.return_value = content
+ self.assertEqual(
+ {'HOME': '/', 'PATH': '/bin:/sbin'},
+ util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_non_existing_file_returns_empty_dict(self, m_load_file):
+ """as implemented, a non-existing pid returns empty dict.
+ This is how it was originally implemented."""
+ m_load_file.side_effect = OSError("File does not exist.")
+ self.assertEqual({}, util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
# vi: ts=4 expandtab