diff options
-rw-r--r-- | cloudinit/net/__init__.py | 34 | ||||
-rw-r--r-- | tests/unittests/test_net.py | 32 |
2 files changed, 65 insertions, 1 deletions
diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py index 3a208e43..7d7f274d 100644 --- a/cloudinit/net/__init__.py +++ b/cloudinit/net/__init__.py @@ -19,6 +19,8 @@ import base64 import errno import glob +import gzip +import io import os import re import shlex @@ -647,6 +649,36 @@ def generate_fallback_config(): return nconf +def _decomp_gzip(blob, strict=True): + # decompress blob. raise exception if not compressed unless strict=False. + with io.BytesIO(blob) as iobuf: + gzfp = None + try: + gzfp = gzip.GzipFile(mode="rb", fileobj=iobuf) + return gzfp.read() + except IOError: + if strict: + raise + return blob + finally: + if gzfp: + gzfp.close() + + +def _b64dgz(b64str, gzipped="try"): + # decode a base64 string. If gzipped is true, transparently uncompresss + # if gzipped is 'try', then try gunzip, returning the original on fail. + try: + blob = base64.b64decode(b64str) + except TypeError: + raise ValueError("Invalid base64 text: %s" % b64str) + + if not gzipped: + return blob + + return _decomp_gzip(blob, strict=gzipped != "try") + + def read_kernel_cmdline_config(files=None, mac_addrs=None, cmdline=None): if cmdline is None: cmdline = util.get_cmdline() @@ -657,7 +689,7 @@ def read_kernel_cmdline_config(files=None, mac_addrs=None, cmdline=None): if tok.startswith("network-config="): data64 = tok.split("=", 1)[1] if data64: - return util.load_yaml(base64.b64decode(data64)) + return util.load_yaml(_b64dgz(data64)) if 'ip=' not in cmdline: return None diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 11c0b1eb..16c44588 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -1,7 +1,12 @@ from cloudinit import util from cloudinit import net from .helpers import TestCase + +import base64 import copy +import io +import gzip +import json import os DHCP_CONTENT_1 = """ @@ -62,6 +67,11 @@ STATIC_EXPECTED_1 = { class TestNetConfigParsing(TestCase): + simple_cfg = { + 'config': [{"type": "physical", "name": "eth0", + "mac_address": "c0:d6:9f:2c:e8:80", + "subnets": [{"type": "dhcp4"}]}]} + def test_klibc_convert_dhcp(self): found = net._klibc_to_config_entry(DHCP_CONTENT_1) self.assertEqual(found, ('eth0', DHCP_EXPECTED_1)) @@ -93,3 +103,25 @@ class TestNetConfigParsing(TestCase): found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs) self.assertEqual(found, expected) + + def test_cmdline_with_b64(self): + data = base64.b64encode(json.dumps(self.simple_cfg).encode()) + encoded_text = data.decode() + cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = net.read_kernel_cmdline_config(cmdline=cmdline) + self.assertEqual(found, self.simple_cfg) + + def test_cmdline_with_b64_gz(self): + data = _gzip_data(json.dumps(self.simple_cfg).encode()) + encoded_text = base64.b64encode(data).decode() + cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = net.read_kernel_cmdline_config(cmdline=cmdline) + self.assertEqual(found, self.simple_cfg) + + +def _gzip_data(data): + with io.BytesIO() as iobuf: + gzfp = gzip.GzipFile(mode="wb", fileobj=iobuf) + gzfp.write(data) + gzfp.close() + return iobuf.getvalue() |