diff options
author | Chad Smith <chad.smith@canonical.com> | 2018-06-21 14:32:27 -0600 |
---|---|---|
committer | Chad Smith <chad.smith@canonical.com> | 2018-06-21 14:32:27 -0600 |
commit | ba53ceb5a8a30c10951ec3ac49b8d6ebbe09a524 (patch) | |
tree | 5b68f0602daea648d48b3dc16809b0d66fb565d1 /tests/unittests/test_util.py | |
parent | 7d1e8976bba629f30da45e814a5a97e2f4b7de3d (diff) | |
parent | 2d6e4219db73e80c135efd83753f9302f778f08d (diff) | |
download | vyos-cloud-init-ba53ceb5a8a30c10951ec3ac49b8d6ebbe09a524.tar.gz vyos-cloud-init-ba53ceb5a8a30c10951ec3ac49b8d6ebbe09a524.zip |
merge from master at 18.3
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 139 |
1 files changed, 126 insertions, 13 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 50101906..7a203ce2 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -4,6 +4,7 @@ from __future__ import print_function import logging import os +import re import shutil import stat import tempfile @@ -265,26 +266,49 @@ class TestGetCmdline(helpers.TestCase): self.assertEqual("abcd 123", ret) -class TestLoadYaml(helpers.TestCase): +class TestLoadYaml(helpers.CiTestCase): mydefault = "7b03a8ebace993d806255121073fed52" + with_logs = True def test_simple(self): mydata = {'1': "one", '2': "two"} self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) def test_nonallowed_returns_default(self): + '''Any unallowed types result in returning default; log the issue.''' # for now, anything not in the allowed list just returns the default. myyaml = yaml.dump({'1': "one"}) self.assertEqual(util.load_yaml(blob=myyaml, default=self.mydefault, allowed=(str,)), self.mydefault) - - def test_bogus_returns_default(self): + regex = re.compile( + r'Yaml load allows \(<(class|type) \'str\'>,\) root types, but' + r' got dict') + self.assertTrue(regex.search(self.logs.getvalue()), + msg='Missing expected yaml load error') + + def test_bogus_scan_error_returns_default(self): + '''On Yaml scan error, load_yaml returns the default and logs issue.''' badyaml = "1\n 2:" self.assertEqual(util.load_yaml(blob=badyaml, default=self.mydefault), self.mydefault) + self.assertIn( + 'Failed loading yaml blob. Invalid format at line 2 column 3:' + ' "mapping values are not allowed here', + self.logs.getvalue()) + + def test_bogus_parse_error_returns_default(self): + '''On Yaml parse error, load_yaml returns default and logs issue.''' + badyaml = "{}}" + self.assertEqual(util.load_yaml(blob=badyaml, + default=self.mydefault), + self.mydefault) + self.assertIn( + 'Failed loading yaml blob. Invalid format at line 1 column 3:' + " \"expected \'<document start>\', but found \'}\'", + self.logs.getvalue()) def test_unsafe_types(self): # should not load complex types @@ -325,7 +349,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): def test_precise_ext4_root(self): - lines = self.readResource('mountinfo_precise_ext4.txt').splitlines() + lines = helpers.readResource('mountinfo_precise_ext4.txt').splitlines() expected = ('/dev/mapper/vg0-root', 'ext4', '/') self.assertEqual(expected, util.parse_mount_info('/', lines)) @@ -347,7 +371,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) def test_raring_btrfs_root(self): - lines = self.readResource('mountinfo_raring_btrfs.txt').splitlines() + lines = helpers.readResource('mountinfo_raring_btrfs.txt').splitlines() expected = ('/dev/vda1', 'btrfs', '/') self.assertEqual(expected, util.parse_mount_info('/', lines)) @@ -373,7 +397,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - self.readResource('zpool_status_simple.txt'), '' + helpers.readResource('zpool_status_simple.txt'), '' ) # save function return values and do asserts ret = util.get_device_info_from_zpool('vmzroot') @@ -406,7 +430,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): m_os.path.exists.return_value = True # mock subp command from util.get_mount_info_fs_on_zpool zpool_output.return_value = ( - self.readResource('zpool_status_simple.txt'), 'error' + helpers.readResource('zpool_status_simple.txt'), 'error' ) # save function return values and do asserts ret = util.get_device_info_from_zpool('vmzroot') @@ -414,7 +438,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): @mock.patch('cloudinit.util.subp') def test_parse_mount_with_ext(self, mount_out): - mount_out.return_value = (self.readResource('mount_parse_ext.txt'), '') + mount_out.return_value = ( + helpers.readResource('mount_parse_ext.txt'), '') # this one is valid and exists in mount_parse_ext.txt ret = util.parse_mount('/var') self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret) @@ -430,7 +455,8 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): @mock.patch('cloudinit.util.subp') def test_parse_mount_with_zfs(self, mount_out): - mount_out.return_value = (self.readResource('mount_parse_zfs.txt'), '') + mount_out.return_value = ( + helpers.readResource('mount_parse_zfs.txt'), '') # this one is valid and exists in mount_parse_zfs.txt ret = util.parse_mount('/var') self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret) @@ -442,6 +468,29 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertIsNone(ret) +class TestIsX86(helpers.CiTestCase): + + def test_is_x86_matches_x86_types(self): + """is_x86 returns True if CPU architecture matches.""" + matched_arches = ['x86_64', 'i386', 'i586', 'i686'] + for arch in matched_arches: + self.assertTrue( + util.is_x86(arch), 'Expected is_x86 for arch "%s"' % arch) + + def test_is_x86_unmatched_types(self): + """is_x86 returns Fale on non-intel x86 architectures.""" + unmatched_arches = ['ia64', '9000/800', 'arm64v71'] + for arch in unmatched_arches: + self.assertFalse( + util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch) + + @mock.patch('cloudinit.util.os.uname') + def test_is_x86_calls_uname_for_architecture(self, m_uname): + """is_x86 returns True if platform from uname matches.""" + m_uname.return_value = [0, 1, 2, 3, 'x86_64'] + self.assertTrue(util.is_x86()) + + class TestReadDMIData(helpers.FilesystemMockingTestCase): def setUp(self): @@ -772,11 +821,11 @@ class TestSubp(helpers.CiTestCase): def test_subp_reads_env(self): with mock.patch.dict("os.environ", values={'FOO': 'BAR'}): - out, err = util.subp(self.printenv + ['FOO'], capture=True) + out, _err = util.subp(self.printenv + ['FOO'], capture=True) self.assertEqual('FOO=BAR', out.splitlines()[0]) def test_subp_env_and_update_env(self): - out, err = util.subp( + out, _err = util.subp( self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, env={'FOO': 'BAR'}, update_env={'HOME': '/myhome', 'K2': 'V2'}) @@ -786,7 +835,7 @@ class TestSubp(helpers.CiTestCase): def test_subp_update_env(self): extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'} with mock.patch.dict("os.environ", values=extra): - out, err = util.subp( + out, _err = util.subp( self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, update_env={'HOME': '/myhome', 'K2': 'V2'}) @@ -800,9 +849,17 @@ class TestSubp(helpers.CiTestCase): os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) self.assertRaisesRegex(util.ProcessExecutionError, - 'Missing #! in script\?', + r'Missing #! in script\?', util.subp, (noshebang,)) + def test_subp_combined_stderr_stdout(self): + """Providing combine_capture as True redirects stderr to stdout.""" + data = b'hello world' + (out, err) = util.subp(self.stdin2err, capture=True, + combine_capture=True, decode=False, data=data) + self.assertEqual(b'', err) + self.assertEqual(data, out) + def test_returns_none_if_no_capture(self): (out, err) = util.subp(self.stdin2out, data=b'', capture=False) self.assertIsNone(err) @@ -1055,4 +1112,60 @@ class TestLoadShellContent(helpers.TestCase): '']))) +class TestGetProcEnv(helpers.TestCase): + """test get_proc_env.""" + null = b'\x00' + simple1 = b'HOME=/' + simple2 = b'PATH=/bin:/sbin' + bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371 + mixed = b'MIXED=' + b'ab\xccde' + + def _val_decoded(self, blob, encoding='utf-8', errors='replace'): + # return the value portion of key=val decoded. + return blob.split(b'=', 1)[1].decode(encoding, errors) + + @mock.patch("cloudinit.util.load_file") + def test_non_utf8_in_environment(self, m_load_file): + """env may have non utf-8 decodable content.""" + content = self.null.join( + (self.bootflag, self.simple1, self.simple2, self.mixed)) + m_load_file.return_value = content + + self.assertEqual( + {'BOOTABLE_FLAG': self._val_decoded(self.bootflag), + 'HOME': '/', 'PATH': '/bin:/sbin', + 'MIXED': self._val_decoded(self.mixed)}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_encoding_none_returns_bytes(self, m_load_file): + """encoding none returns bytes.""" + lines = (self.bootflag, self.simple1, self.simple2, self.mixed) + content = self.null.join(lines) + m_load_file.return_value = content + + self.assertEqual( + dict([t.split(b'=') for t in lines]), + util.get_proc_env(1, encoding=None)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_all_utf8_encoded(self, m_load_file): + """common path where only utf-8 decodable content.""" + content = self.null.join((self.simple1, self.simple2)) + m_load_file.return_value = content + self.assertEqual( + {'HOME': '/', 'PATH': '/bin:/sbin'}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_non_existing_file_returns_empty_dict(self, m_load_file): + """as implemented, a non-existing pid returns empty dict. + This is how it was originally implemented.""" + m_load_file.side_effect = OSError("File does not exist.") + self.assertEqual({}, util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + # vi: ts=4 expandtab |