summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2016-03-24 11:19:41 -0400
committerScott Moser <smoser@ubuntu.com>2016-03-24 11:19:41 -0400
commitf9cae62c2f70c582a6b12a98911dc82180881364 (patch)
tree06e98b536bb6bb371be8695d148285e53a1bd224
parent740facb79c70cd8e3380e08acb4f5c5a285bb1ae (diff)
downloadvyos-cloud-init-f9cae62c2f70c582a6b12a98911dc82180881364.tar.gz
vyos-cloud-init-f9cae62c2f70c582a6b12a98911dc82180881364.zip
add suport for base64 encoded gzipped text on command line
add tests to show this functional.
-rw-r--r--cloudinit/net/__init__.py34
-rw-r--r--tests/unittests/test_net.py32
2 files changed, 65 insertions, 1 deletions
diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py
index 3a208e43..7d7f274d 100644
--- a/cloudinit/net/__init__.py
+++ b/cloudinit/net/__init__.py
@@ -19,6 +19,8 @@
import base64
import errno
import glob
+import gzip
+import io
import os
import re
import shlex
@@ -647,6 +649,36 @@ def generate_fallback_config():
return nconf
+def _decomp_gzip(blob, strict=True):
+ # decompress blob. raise exception if not compressed unless strict=False.
+ with io.BytesIO(blob) as iobuf:
+ gzfp = None
+ try:
+ gzfp = gzip.GzipFile(mode="rb", fileobj=iobuf)
+ return gzfp.read()
+ except IOError:
+ if strict:
+ raise
+ return blob
+ finally:
+ if gzfp:
+ gzfp.close()
+
+
+def _b64dgz(b64str, gzipped="try"):
+ # decode a base64 string. If gzipped is true, transparently uncompresss
+ # if gzipped is 'try', then try gunzip, returning the original on fail.
+ try:
+ blob = base64.b64decode(b64str)
+ except TypeError:
+ raise ValueError("Invalid base64 text: %s" % b64str)
+
+ if not gzipped:
+ return blob
+
+ return _decomp_gzip(blob, strict=gzipped != "try")
+
+
def read_kernel_cmdline_config(files=None, mac_addrs=None, cmdline=None):
if cmdline is None:
cmdline = util.get_cmdline()
@@ -657,7 +689,7 @@ def read_kernel_cmdline_config(files=None, mac_addrs=None, cmdline=None):
if tok.startswith("network-config="):
data64 = tok.split("=", 1)[1]
if data64:
- return util.load_yaml(base64.b64decode(data64))
+ return util.load_yaml(_b64dgz(data64))
if 'ip=' not in cmdline:
return None
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 11c0b1eb..16c44588 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1,7 +1,12 @@
from cloudinit import util
from cloudinit import net
from .helpers import TestCase
+
+import base64
import copy
+import io
+import gzip
+import json
import os
DHCP_CONTENT_1 = """
@@ -62,6 +67,11 @@ STATIC_EXPECTED_1 = {
class TestNetConfigParsing(TestCase):
+ simple_cfg = {
+ 'config': [{"type": "physical", "name": "eth0",
+ "mac_address": "c0:d6:9f:2c:e8:80",
+ "subnets": [{"type": "dhcp4"}]}]}
+
def test_klibc_convert_dhcp(self):
found = net._klibc_to_config_entry(DHCP_CONTENT_1)
self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
@@ -93,3 +103,25 @@ class TestNetConfigParsing(TestCase):
found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs)
self.assertEqual(found, expected)
+
+ def test_cmdline_with_b64(self):
+ data = base64.b64encode(json.dumps(self.simple_cfg).encode())
+ encoded_text = data.decode()
+ cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ self.assertEqual(found, self.simple_cfg)
+
+ def test_cmdline_with_b64_gz(self):
+ data = _gzip_data(json.dumps(self.simple_cfg).encode())
+ encoded_text = base64.b64encode(data).decode()
+ cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ self.assertEqual(found, self.simple_cfg)
+
+
+def _gzip_data(data):
+ with io.BytesIO() as iobuf:
+ gzfp = gzip.GzipFile(mode="wb", fileobj=iobuf)
+ gzfp.write(data)
+ gzfp.close()
+ return iobuf.getvalue()