diff options
| -rw-r--r-- | cloudinit/sources/DataSourceHetzner.py | 9 | ||||
| -rw-r--r-- | cloudinit/sources/helpers/hetzner.py | 19 | ||||
| -rw-r--r-- | tests/unittests/test_datasource/test_hetzner.py | 23 | 
3 files changed, 50 insertions, 1 deletions
| diff --git a/cloudinit/sources/DataSourceHetzner.py b/cloudinit/sources/DataSourceHetzner.py index 50298330..70e4274c 100644 --- a/cloudinit/sources/DataSourceHetzner.py +++ b/cloudinit/sources/DataSourceHetzner.py @@ -59,7 +59,14 @@ class DataSourceHetzner(sources.DataSource):                  self.userdata_address, timeout=self.timeout,                  sec_between=self.wait_retry, retries=self.retries) -        self.userdata_raw = ud +        # Hetzner cloud does not support binary user-data. So here, do a +        # base64 decode of the data if we can. The end result being that a +        # user can provide base64 encoded (possibly gzipped) data as user-data. +        # +        # The fallout is that in the event of b64 encoded user-data, +        # /var/lib/cloud-init/cloud-config.txt will not be identical to the +        # user-data provided.  It will be decoded. +        self.userdata_raw = hc_helper.maybe_b64decode(ud)          self.metadata_full = md          """hostname is name provided by user at launch.  The API enforces diff --git a/cloudinit/sources/helpers/hetzner.py b/cloudinit/sources/helpers/hetzner.py index 2554530d..72edb023 100644 --- a/cloudinit/sources/helpers/hetzner.py +++ b/cloudinit/sources/helpers/hetzner.py @@ -7,6 +7,9 @@ from cloudinit import log as logging  from cloudinit import url_helper  from cloudinit import util +import base64 +import binascii +  LOG = logging.getLogger(__name__) @@ -24,3 +27,19 @@ def read_userdata(url, timeout=2, sec_between=2, retries=30):      if not response.ok():          raise RuntimeError("unable to read userdata at %s" % url)      return response.contents + + +def maybe_b64decode(data: bytes) -> bytes: +    """base64 decode data + +    If data is base64 encoded bytes, return b64decode(data). +    If not, return data unmodified. + +    @param data: data as bytes. TypeError is raised if not bytes. +    """ +    if not isinstance(data, bytes): +        raise TypeError("data is '%s', expected bytes" % type(data)) +    try: +        return base64.b64decode(data, validate=True) +    except binascii.Error: +        return data diff --git a/tests/unittests/test_datasource/test_hetzner.py b/tests/unittests/test_datasource/test_hetzner.py index a9c12597..d0879545 100644 --- a/tests/unittests/test_datasource/test_hetzner.py +++ b/tests/unittests/test_datasource/test_hetzner.py @@ -5,10 +5,14 @@  # This file is part of cloud-init. See LICENSE file for license information.  from cloudinit.sources import DataSourceHetzner +import cloudinit.sources.helpers.hetzner as hc_helper  from cloudinit import util, settings, helpers  from cloudinit.tests.helpers import mock, CiTestCase +import base64 +import pytest +  METADATA = util.load_yaml("""  hostname: cloudinit-test  instance-id: 123456 @@ -115,3 +119,22 @@ class TestDataSourceHetzner(CiTestCase):          # These are a white box attempt to ensure it did not search.          m_find_fallback.assert_not_called()          m_read_md.assert_not_called() + + +class TestMaybeB64Decode: +    """Test the maybe_b64decode helper function.""" + +    @pytest.mark.parametrize("invalid_input", (str("not bytes"), int(4))) +    def test_raises_error_on_non_bytes(self, invalid_input): +        """maybe_b64decode should raise error if data is not bytes.""" +        with pytest.raises(TypeError): +            hc_helper.maybe_b64decode(invalid_input) + +    @pytest.mark.parametrize("in_data,expected", [ +        # If data is not b64 encoded, then return value should be the same. +        (b"this is my data", b"this is my data"), +        # If data is b64 encoded, then return value should be decoded. +        (base64.b64encode(b"data"), b"data"), +    ]) +    def test_happy_path(self, in_data, expected): +        assert expected == hc_helper.maybe_b64decode(in_data) | 
