diff options
-rw-r--r-- | cloudinit/sources/DataSourceHetzner.py | 9 | ||||
-rw-r--r-- | cloudinit/sources/helpers/hetzner.py | 19 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_hetzner.py | 23 |
3 files changed, 50 insertions, 1 deletions
diff --git a/cloudinit/sources/DataSourceHetzner.py b/cloudinit/sources/DataSourceHetzner.py index 50298330..70e4274c 100644 --- a/cloudinit/sources/DataSourceHetzner.py +++ b/cloudinit/sources/DataSourceHetzner.py @@ -59,7 +59,14 @@ class DataSourceHetzner(sources.DataSource): self.userdata_address, timeout=self.timeout, sec_between=self.wait_retry, retries=self.retries) - self.userdata_raw = ud + # Hetzner cloud does not support binary user-data. So here, do a + # base64 decode of the data if we can. The end result being that a + # user can provide base64 encoded (possibly gzipped) data as user-data. + # + # The fallout is that in the event of b64 encoded user-data, + # /var/lib/cloud-init/cloud-config.txt will not be identical to the + # user-data provided. It will be decoded. + self.userdata_raw = hc_helper.maybe_b64decode(ud) self.metadata_full = md """hostname is name provided by user at launch. The API enforces diff --git a/cloudinit/sources/helpers/hetzner.py b/cloudinit/sources/helpers/hetzner.py index 2554530d..72edb023 100644 --- a/cloudinit/sources/helpers/hetzner.py +++ b/cloudinit/sources/helpers/hetzner.py @@ -7,6 +7,9 @@ from cloudinit import log as logging from cloudinit import url_helper from cloudinit import util +import base64 +import binascii + LOG = logging.getLogger(__name__) @@ -24,3 +27,19 @@ def read_userdata(url, timeout=2, sec_between=2, retries=30): if not response.ok(): raise RuntimeError("unable to read userdata at %s" % url) return response.contents + + +def maybe_b64decode(data: bytes) -> bytes: + """base64 decode data + + If data is base64 encoded bytes, return b64decode(data). + If not, return data unmodified. + + @param data: data as bytes. TypeError is raised if not bytes. + """ + if not isinstance(data, bytes): + raise TypeError("data is '%s', expected bytes" % type(data)) + try: + return base64.b64decode(data, validate=True) + except binascii.Error: + return data diff --git a/tests/unittests/test_datasource/test_hetzner.py b/tests/unittests/test_datasource/test_hetzner.py index a9c12597..d0879545 100644 --- a/tests/unittests/test_datasource/test_hetzner.py +++ b/tests/unittests/test_datasource/test_hetzner.py @@ -5,10 +5,14 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.sources import DataSourceHetzner +import cloudinit.sources.helpers.hetzner as hc_helper from cloudinit import util, settings, helpers from cloudinit.tests.helpers import mock, CiTestCase +import base64 +import pytest + METADATA = util.load_yaml(""" hostname: cloudinit-test instance-id: 123456 @@ -115,3 +119,22 @@ class TestDataSourceHetzner(CiTestCase): # These are a white box attempt to ensure it did not search. m_find_fallback.assert_not_called() m_read_md.assert_not_called() + + +class TestMaybeB64Decode: + """Test the maybe_b64decode helper function.""" + + @pytest.mark.parametrize("invalid_input", (str("not bytes"), int(4))) + def test_raises_error_on_non_bytes(self, invalid_input): + """maybe_b64decode should raise error if data is not bytes.""" + with pytest.raises(TypeError): + hc_helper.maybe_b64decode(invalid_input) + + @pytest.mark.parametrize("in_data,expected", [ + # If data is not b64 encoded, then return value should be the same. + (b"this is my data", b"this is my data"), + # If data is b64 encoded, then return value should be decoded. + (base64.b64encode(b"data"), b"data"), + ]) + def test_happy_path(self, in_data, expected): + assert expected == hc_helper.maybe_b64decode(in_data) |