From 76652f3e07b6f659b2fd166a6619cb427dc6bc7e Mon Sep 17 00:00:00 2001 From: Scott Moser Date: Mon, 22 Jun 2020 14:44:37 -0400 Subject: Hetzner: support reading user-data that is base64 encoded. (#448) Hetzner cloud only supports user-data as a string (presumably utf-8). In order to allow users on Hetzner to provide binary data to cloud-init, we will attempt to base64decode the userdata. The change here adds a 'maybe_b64decode' function that will decode data if and only if is base64 encoded. The reason for not using util.b64d is that we do not want the return value decoded to a string, and util.b64d will do that if it can. Additionally we call decode with validate=True which oddly is not the default. LP: #1884071 --- cloudinit/sources/DataSourceHetzner.py | 9 ++++++++- cloudinit/sources/helpers/hetzner.py | 19 +++++++++++++++++++ tests/unittests/test_datasource/test_hetzner.py | 23 +++++++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/cloudinit/sources/DataSourceHetzner.py b/cloudinit/sources/DataSourceHetzner.py index 50298330..70e4274c 100644 --- a/cloudinit/sources/DataSourceHetzner.py +++ b/cloudinit/sources/DataSourceHetzner.py @@ -59,7 +59,14 @@ class DataSourceHetzner(sources.DataSource): self.userdata_address, timeout=self.timeout, sec_between=self.wait_retry, retries=self.retries) - self.userdata_raw = ud + # Hetzner cloud does not support binary user-data. So here, do a + # base64 decode of the data if we can. The end result being that a + # user can provide base64 encoded (possibly gzipped) data as user-data. + # + # The fallout is that in the event of b64 encoded user-data, + # /var/lib/cloud-init/cloud-config.txt will not be identical to the + # user-data provided. It will be decoded. + self.userdata_raw = hc_helper.maybe_b64decode(ud) self.metadata_full = md """hostname is name provided by user at launch. The API enforces diff --git a/cloudinit/sources/helpers/hetzner.py b/cloudinit/sources/helpers/hetzner.py index 2554530d..72edb023 100644 --- a/cloudinit/sources/helpers/hetzner.py +++ b/cloudinit/sources/helpers/hetzner.py @@ -7,6 +7,9 @@ from cloudinit import log as logging from cloudinit import url_helper from cloudinit import util +import base64 +import binascii + LOG = logging.getLogger(__name__) @@ -24,3 +27,19 @@ def read_userdata(url, timeout=2, sec_between=2, retries=30): if not response.ok(): raise RuntimeError("unable to read userdata at %s" % url) return response.contents + + +def maybe_b64decode(data: bytes) -> bytes: + """base64 decode data + + If data is base64 encoded bytes, return b64decode(data). + If not, return data unmodified. + + @param data: data as bytes. TypeError is raised if not bytes. + """ + if not isinstance(data, bytes): + raise TypeError("data is '%s', expected bytes" % type(data)) + try: + return base64.b64decode(data, validate=True) + except binascii.Error: + return data diff --git a/tests/unittests/test_datasource/test_hetzner.py b/tests/unittests/test_datasource/test_hetzner.py index a9c12597..d0879545 100644 --- a/tests/unittests/test_datasource/test_hetzner.py +++ b/tests/unittests/test_datasource/test_hetzner.py @@ -5,10 +5,14 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.sources import DataSourceHetzner +import cloudinit.sources.helpers.hetzner as hc_helper from cloudinit import util, settings, helpers from cloudinit.tests.helpers import mock, CiTestCase +import base64 +import pytest + METADATA = util.load_yaml(""" hostname: cloudinit-test instance-id: 123456 @@ -115,3 +119,22 @@ class TestDataSourceHetzner(CiTestCase): # These are a white box attempt to ensure it did not search. m_find_fallback.assert_not_called() m_read_md.assert_not_called() + + +class TestMaybeB64Decode: + """Test the maybe_b64decode helper function.""" + + @pytest.mark.parametrize("invalid_input", (str("not bytes"), int(4))) + def test_raises_error_on_non_bytes(self, invalid_input): + """maybe_b64decode should raise error if data is not bytes.""" + with pytest.raises(TypeError): + hc_helper.maybe_b64decode(invalid_input) + + @pytest.mark.parametrize("in_data,expected", [ + # If data is not b64 encoded, then return value should be the same. + (b"this is my data", b"this is my data"), + # If data is b64 encoded, then return value should be decoded. + (base64.b64encode(b"data"), b"data"), + ]) + def test_happy_path(self, in_data, expected): + assert expected == hc_helper.maybe_b64decode(in_data) -- cgit v1.2.3