diff options
author | Scott Moser <smoser@ubuntu.com> | 2018-06-13 12:42:43 -0400 |
---|---|---|
committer | Scott Moser <smoser@brickies.net> | 2018-06-13 12:42:43 -0400 |
commit | 171378613ef4de3c1bb4170a10ec1748ac62f39f (patch) | |
tree | 5bbf7543e2ee27ef0dd851fc3e3eb7ef8d8db022 | |
parent | 27283c31f4bf85f40588cfa3b31389d70ec00243 (diff) | |
download | vyos-cloud-init-171378613ef4de3c1bb4170a10ec1748ac62f39f.tar.gz vyos-cloud-init-171378613ef4de3c1bb4170a10ec1748ac62f39f.zip |
Fix get_proc_env for pids that have non-utf8 content in environment.
There is no requirement that the environment of a process contains
only utf-8 data. This modifies get_proc_env to support it reading
data as binary and decoding if provided with an encoding.
The default case is now that we now do:
contents.decode('utf-8', 'replace')
rather than
contents.decode('utf-8', 'strict')
LP: #1775371
-rw-r--r-- | cloudinit/util.py | 35 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 56 |
2 files changed, 78 insertions, 13 deletions
diff --git a/cloudinit/util.py b/cloudinit/util.py index 0017de72..26a41122 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -2131,24 +2131,33 @@ def is_container(): return False -def get_proc_env(pid): +def get_proc_env(pid, encoding='utf-8', errors='replace'): """ Return the environment in a dict that a given process id was started with. - """ - env = {} - fn = os.path.join("/proc/", str(pid), "environ") + @param encoding: if true, then decoding will be done with + .decode(encoding, errors) and text will be returned. + if false then binary will be returned. + @param errors: only used if encoding is true.""" + fn = os.path.join("/proc", str(pid), "environ") + try: - contents = load_file(fn) - toks = contents.split("\x00") - for tok in toks: - if tok == "": - continue - (name, val) = tok.split("=", 1) - if name: - env[name] = val + contents = load_file(fn, decode=False) except (IOError, OSError): - pass + return {} + + env = {} + null, equal = (b"\x00", b"=") + if encoding: + null, equal = ("\x00", "=") + contents = contents.decode(encoding, errors) + + for tok in contents.split(null): + if not tok: + continue + (name, val) = tok.split(equal, 1) + if name: + env[name] = val return env diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 2db8e3fa..20479f66 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1089,4 +1089,60 @@ class TestLoadShellContent(helpers.TestCase): '']))) +class TestGetProcEnv(helpers.TestCase): + """test get_proc_env.""" + null = b'\x00' + simple1 = b'HOME=/' + simple2 = b'PATH=/bin:/sbin' + bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371 + mixed = b'MIXED=' + b'ab\xccde' + + def _val_decoded(self, blob, encoding='utf-8', errors='replace'): + # return the value portion of key=val decoded. + return blob.split(b'=', 1)[1].decode(encoding, errors) + + @mock.patch("cloudinit.util.load_file") + def test_non_utf8_in_environment(self, m_load_file): + """env may have non utf-8 decodable content.""" + content = self.null.join( + (self.bootflag, self.simple1, self.simple2, self.mixed)) + m_load_file.return_value = content + + self.assertEqual( + {'BOOTABLE_FLAG': self._val_decoded(self.bootflag), + 'HOME': '/', 'PATH': '/bin:/sbin', + 'MIXED': self._val_decoded(self.mixed)}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_encoding_none_returns_bytes(self, m_load_file): + """encoding none returns bytes.""" + lines = (self.bootflag, self.simple1, self.simple2, self.mixed) + content = self.null.join(lines) + m_load_file.return_value = content + + self.assertEqual( + dict([t.split(b'=') for t in lines]), + util.get_proc_env(1, encoding=None)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_all_utf8_encoded(self, m_load_file): + """common path where only utf-8 decodable content.""" + content = self.null.join((self.simple1, self.simple2)) + m_load_file.return_value = content + self.assertEqual( + {'HOME': '/', 'PATH': '/bin:/sbin'}, + util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + + @mock.patch("cloudinit.util.load_file") + def test_non_existing_file_returns_empty_dict(self, m_load_file): + """as implemented, a non-existing pid returns empty dict. + This is how it was originally implemented.""" + m_load_file.side_effect = OSError("File does not exist.") + self.assertEqual({}, util.get_proc_env(1)) + self.assertEqual(1, m_load_file.call_count) + # vi: ts=4 expandtab |