summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/sources/DataSourceHetzner.py9
-rw-r--r--cloudinit/sources/helpers/hetzner.py19
-rw-r--r--tests/unittests/test_datasource/test_hetzner.py23
3 files changed, 50 insertions, 1 deletions
diff --git a/cloudinit/sources/DataSourceHetzner.py b/cloudinit/sources/DataSourceHetzner.py
index 50298330..70e4274c 100644
--- a/cloudinit/sources/DataSourceHetzner.py
+++ b/cloudinit/sources/DataSourceHetzner.py
@@ -59,7 +59,14 @@ class DataSourceHetzner(sources.DataSource):
self.userdata_address, timeout=self.timeout,
sec_between=self.wait_retry, retries=self.retries)
- self.userdata_raw = ud
+ # Hetzner cloud does not support binary user-data. So here, do a
+ # base64 decode of the data if we can. The end result being that a
+ # user can provide base64 encoded (possibly gzipped) data as user-data.
+ #
+ # The fallout is that in the event of b64 encoded user-data,
+ # /var/lib/cloud-init/cloud-config.txt will not be identical to the
+ # user-data provided. It will be decoded.
+ self.userdata_raw = hc_helper.maybe_b64decode(ud)
self.metadata_full = md
"""hostname is name provided by user at launch. The API enforces
diff --git a/cloudinit/sources/helpers/hetzner.py b/cloudinit/sources/helpers/hetzner.py
index 2554530d..72edb023 100644
--- a/cloudinit/sources/helpers/hetzner.py
+++ b/cloudinit/sources/helpers/hetzner.py
@@ -7,6 +7,9 @@ from cloudinit import log as logging
from cloudinit import url_helper
from cloudinit import util
+import base64
+import binascii
+
LOG = logging.getLogger(__name__)
@@ -24,3 +27,19 @@ def read_userdata(url, timeout=2, sec_between=2, retries=30):
if not response.ok():
raise RuntimeError("unable to read userdata at %s" % url)
return response.contents
+
+
+def maybe_b64decode(data: bytes) -> bytes:
+ """base64 decode data
+
+ If data is base64 encoded bytes, return b64decode(data).
+ If not, return data unmodified.
+
+ @param data: data as bytes. TypeError is raised if not bytes.
+ """
+ if not isinstance(data, bytes):
+ raise TypeError("data is '%s', expected bytes" % type(data))
+ try:
+ return base64.b64decode(data, validate=True)
+ except binascii.Error:
+ return data
diff --git a/tests/unittests/test_datasource/test_hetzner.py b/tests/unittests/test_datasource/test_hetzner.py
index a9c12597..d0879545 100644
--- a/tests/unittests/test_datasource/test_hetzner.py
+++ b/tests/unittests/test_datasource/test_hetzner.py
@@ -5,10 +5,14 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.sources import DataSourceHetzner
+import cloudinit.sources.helpers.hetzner as hc_helper
from cloudinit import util, settings, helpers
from cloudinit.tests.helpers import mock, CiTestCase
+import base64
+import pytest
+
METADATA = util.load_yaml("""
hostname: cloudinit-test
instance-id: 123456
@@ -115,3 +119,22 @@ class TestDataSourceHetzner(CiTestCase):
# These are a white box attempt to ensure it did not search.
m_find_fallback.assert_not_called()
m_read_md.assert_not_called()
+
+
+class TestMaybeB64Decode:
+ """Test the maybe_b64decode helper function."""
+
+ @pytest.mark.parametrize("invalid_input", (str("not bytes"), int(4)))
+ def test_raises_error_on_non_bytes(self, invalid_input):
+ """maybe_b64decode should raise error if data is not bytes."""
+ with pytest.raises(TypeError):
+ hc_helper.maybe_b64decode(invalid_input)
+
+ @pytest.mark.parametrize("in_data,expected", [
+ # If data is not b64 encoded, then return value should be the same.
+ (b"this is my data", b"this is my data"),
+ # If data is b64 encoded, then return value should be decoded.
+ (base64.b64encode(b"data"), b"data"),
+ ])
+ def test_happy_path(self, in_data, expected):
+ assert expected == hc_helper.maybe_b64decode(in_data)