summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/test_scaleway.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/test_scaleway.py')
-rw-r--r--tests/unittests/sources/test_scaleway.py526
1 files changed, 526 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_scaleway.py b/tests/unittests/sources/test_scaleway.py
new file mode 100644
index 00000000..d7e8b969
--- /dev/null
+++ b/tests/unittests/sources/test_scaleway.py
@@ -0,0 +1,526 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import json
+
+import httpretty
+import requests
+
+from cloudinit import helpers, settings, sources
+from cloudinit.sources import DataSourceScaleway
+from tests.unittests.helpers import CiTestCase, HttprettyTestCase, mock
+
+
+class DataResponses(object):
+ """
+ Possible responses of the API endpoint
+ 169.254.42.42/user_data/cloud-init and
+ 169.254.42.42/vendor_data/cloud-init.
+ """
+
+ FAKE_USER_DATA = '#!/bin/bash\necho "user-data"'
+
+ @staticmethod
+ def rate_limited(method, uri, headers):
+ return 429, headers, ""
+
+ @staticmethod
+ def api_error(method, uri, headers):
+ return 500, headers, ""
+
+ @classmethod
+ def get_ok(cls, method, uri, headers):
+ return 200, headers, cls.FAKE_USER_DATA
+
+ @staticmethod
+ def empty(method, uri, headers):
+ """
+ No user data for this server.
+ """
+ return 404, headers, ""
+
+
+class MetadataResponses(object):
+ """
+ Possible responses of the metadata API.
+ """
+
+ FAKE_METADATA = {
+ "id": "00000000-0000-0000-0000-000000000000",
+ "hostname": "scaleway.host",
+ "tags": [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ ],
+ "ssh_public_keys": [
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ "fingerprint": "2048 06:ae:... login (RSA)",
+ },
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "fingerprint": "2048 06:ff:... login2 (RSA)",
+ },
+ ],
+ }
+
+ @classmethod
+ def get_ok(cls, method, uri, headers):
+ return 200, headers, json.dumps(cls.FAKE_METADATA)
+
+
+class TestOnScaleway(CiTestCase):
+ def setUp(self):
+ super(TestOnScaleway, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline):
+ mock, faked = fake_dmi
+ mock.return_value = "Scaleway" if faked else "Whatever"
+
+ mock, faked = fake_file_exists
+ mock.return_value = faked
+
+ mock, faked = fake_cmdline
+ mock.return_value = (
+ "initrd=initrd showopts scaleway nousb"
+ if faked
+ else "BOOT_IMAGE=/vmlinuz-3.11.0-26-generic"
+ )
+
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("os.path.exists")
+ @mock.patch("cloudinit.dmi.read_dmi_data")
+ def test_not_on_scaleway(
+ self, m_read_dmi_data, m_file_exists, m_get_cmdline
+ ):
+ self.install_mocks(
+ fake_dmi=(m_read_dmi_data, False),
+ fake_file_exists=(m_file_exists, False),
+ fake_cmdline=(m_get_cmdline, False),
+ )
+ self.assertFalse(DataSourceScaleway.on_scaleway())
+
+ # When not on Scaleway, get_data() returns False.
+ datasource = DataSourceScaleway.DataSourceScaleway(
+ settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp})
+ )
+ self.assertFalse(datasource.get_data())
+
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("os.path.exists")
+ @mock.patch("cloudinit.dmi.read_dmi_data")
+ def test_on_scaleway_dmi(
+ self, m_read_dmi_data, m_file_exists, m_get_cmdline
+ ):
+ """
+ dmidecode returns "Scaleway".
+ """
+ # dmidecode returns "Scaleway"
+ self.install_mocks(
+ fake_dmi=(m_read_dmi_data, True),
+ fake_file_exists=(m_file_exists, False),
+ fake_cmdline=(m_get_cmdline, False),
+ )
+ self.assertTrue(DataSourceScaleway.on_scaleway())
+
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("os.path.exists")
+ @mock.patch("cloudinit.dmi.read_dmi_data")
+ def test_on_scaleway_var_run_scaleway(
+ self, m_read_dmi_data, m_file_exists, m_get_cmdline
+ ):
+ """
+ /var/run/scaleway exists.
+ """
+ self.install_mocks(
+ fake_dmi=(m_read_dmi_data, False),
+ fake_file_exists=(m_file_exists, True),
+ fake_cmdline=(m_get_cmdline, False),
+ )
+ self.assertTrue(DataSourceScaleway.on_scaleway())
+
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("os.path.exists")
+ @mock.patch("cloudinit.dmi.read_dmi_data")
+ def test_on_scaleway_cmdline(
+ self, m_read_dmi_data, m_file_exists, m_get_cmdline
+ ):
+ """
+ "scaleway" in /proc/cmdline.
+ """
+ self.install_mocks(
+ fake_dmi=(m_read_dmi_data, False),
+ fake_file_exists=(m_file_exists, False),
+ fake_cmdline=(m_get_cmdline, True),
+ )
+ self.assertTrue(DataSourceScaleway.on_scaleway())
+
+
+def get_source_address_adapter(*args, **kwargs):
+ """
+ Scaleway user/vendor data API requires to be called with a privileged port.
+
+ If the unittests are run as non-root, the user doesn't have the permission
+ to bind on ports below 1024.
+
+ This function removes the bind on a privileged address, since anyway the
+ HTTP call is mocked by httpretty.
+ """
+ kwargs.pop("source_address")
+ return requests.adapters.HTTPAdapter(*args, **kwargs)
+
+
+class TestDataSourceScaleway(HttprettyTestCase):
+ def setUp(self):
+ tmp = self.tmp_dir()
+ self.datasource = DataSourceScaleway.DataSourceScaleway(
+ settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": tmp})
+ )
+ super(TestDataSourceScaleway, self).setUp()
+
+ self.metadata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[
+ "metadata_url"
+ ]
+ self.userdata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[
+ "userdata_url"
+ ]
+ self.vendordata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[
+ "vendordata_url"
+ ]
+
+ self.add_patch(
+ "cloudinit.sources.DataSourceScaleway.on_scaleway",
+ "_m_on_scaleway",
+ return_value=True,
+ )
+ self.add_patch(
+ "cloudinit.sources.DataSourceScaleway.net.find_fallback_nic",
+ "_m_find_fallback_nic",
+ return_value="scalewaynic0",
+ )
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
+ @mock.patch(
+ "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter",
+ get_source_address_adapter,
+ )
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("time.sleep", return_value=None)
+ def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4):
+ """
+ get_data() returns metadata, user data and vendor data.
+ """
+ m_get_cmdline.return_value = "scaleway"
+
+ # Make user data API return a valid response
+ httpretty.register_uri(
+ httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok
+ )
+ httpretty.register_uri(
+ httpretty.GET, self.userdata_url, body=DataResponses.get_ok
+ )
+ httpretty.register_uri(
+ httpretty.GET, self.vendordata_url, body=DataResponses.get_ok
+ )
+ self.datasource.get_data()
+
+ self.assertEqual(
+ self.datasource.get_instance_id(),
+ MetadataResponses.FAKE_METADATA["id"],
+ )
+ self.assertEqual(
+ self.datasource.get_public_ssh_keys().sort(),
+ [
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ ].sort(),
+ )
+ self.assertEqual(
+ self.datasource.get_hostname(),
+ MetadataResponses.FAKE_METADATA["hostname"],
+ )
+ self.assertEqual(
+ self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA
+ )
+ self.assertEqual(
+ self.datasource.get_vendordata_raw(), DataResponses.FAKE_USER_DATA
+ )
+ self.assertIsNone(self.datasource.availability_zone)
+ self.assertIsNone(self.datasource.region)
+ self.assertEqual(sleep.call_count, 0)
+
+ def test_ssh_keys_empty(self):
+ """
+ get_public_ssh_keys() should return empty list if no ssh key are
+ available
+ """
+ self.datasource.metadata["tags"] = []
+ self.datasource.metadata["ssh_public_keys"] = []
+ self.assertEqual(self.datasource.get_public_ssh_keys(), [])
+
+ def test_ssh_keys_only_tags(self):
+ """
+ get_public_ssh_keys() should return list of keys available in tags
+ """
+ self.datasource.metadata["tags"] = [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ ]
+ self.datasource.metadata["ssh_public_keys"] = []
+ self.assertEqual(
+ self.datasource.get_public_ssh_keys().sort(),
+ [
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ ].sort(),
+ )
+
+ def test_ssh_keys_only_conf(self):
+ """
+ get_public_ssh_keys() should return list of keys available in
+ ssh_public_keys field
+ """
+ self.datasource.metadata["tags"] = []
+ self.datasource.metadata["ssh_public_keys"] = [
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ "fingerprint": "2048 06:ae:... login (RSA)",
+ },
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "fingerprint": "2048 06:ff:... login2 (RSA)",
+ },
+ ]
+ self.assertEqual(
+ self.datasource.get_public_ssh_keys().sort(),
+ [
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ ].sort(),
+ )
+
+ def test_ssh_keys_both(self):
+ """
+ get_public_ssh_keys() should return a merge of keys available
+ in ssh_public_keys and tags
+ """
+ self.datasource.metadata["tags"] = [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ ]
+
+ self.datasource.metadata["ssh_public_keys"] = [
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ "fingerprint": "2048 06:ae:... login (RSA)",
+ },
+ {
+ "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "fingerprint": "2048 06:ff:... login2 (RSA)",
+ },
+ ]
+ self.assertEqual(
+ self.datasource.get_public_ssh_keys().sort(),
+ [
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA",
+ ].sort(),
+ )
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
+ @mock.patch(
+ "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter",
+ get_source_address_adapter,
+ )
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("time.sleep", return_value=None)
+ def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4):
+ """
+ get_data() returns metadata, but no user data nor vendor data.
+ """
+ m_get_cmdline.return_value = "scaleway"
+
+ # Make user and vendor data APIs return HTTP/404, which means there is
+ # no user / vendor data for the server.
+ httpretty.register_uri(
+ httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok
+ )
+ httpretty.register_uri(
+ httpretty.GET, self.userdata_url, body=DataResponses.empty
+ )
+ httpretty.register_uri(
+ httpretty.GET, self.vendordata_url, body=DataResponses.empty
+ )
+ self.datasource.get_data()
+ self.assertIsNone(self.datasource.get_userdata_raw())
+ self.assertIsNone(self.datasource.get_vendordata_raw())
+ self.assertEqual(sleep.call_count, 0)
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4")
+ @mock.patch(
+ "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter",
+ get_source_address_adapter,
+ )
+ @mock.patch("cloudinit.util.get_cmdline")
+ @mock.patch("time.sleep", return_value=None)
+ def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4):
+ """
+ get_data() is rate limited two times by the metadata API when fetching
+ user data.
+ """
+ m_get_cmdline.return_value = "scaleway"
+
+ httpretty.register_uri(
+ httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok
+ )
+ httpretty.register_uri(
+ httpretty.GET, self.vendordata_url, body=DataResponses.empty
+ )
+
+ httpretty.register_uri(
+ httpretty.GET,
+ self.userdata_url,
+ responses=[
+ httpretty.Response(body=DataResponses.rate_limited),
+ httpretty.Response(body=DataResponses.rate_limited),
+ httpretty.Response(body=DataResponses.get_ok),
+ ],
+ )
+ self.datasource.get_data()
+ self.assertEqual(
+ self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA
+ )
+ self.assertEqual(sleep.call_count, 2)
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
+ @mock.patch("cloudinit.util.get_cmdline")
+ def test_network_config_ok(self, m_get_cmdline, fallback_nic):
+ """
+ network_config will only generate IPv4 config if no ipv6 data is
+ available in the metadata
+ """
+ m_get_cmdline.return_value = "scaleway"
+ fallback_nic.return_value = "ens2"
+ self.datasource.metadata["ipv6"] = None
+
+ netcfg = self.datasource.network_config
+ resp = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "ens2",
+ "subnets": [{"type": "dhcp4"}],
+ }
+ ],
+ }
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
+ @mock.patch("cloudinit.util.get_cmdline")
+ def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic):
+ """
+ network_config will only generate IPv4/v6 configs if ipv6 data is
+ available in the metadata
+ """
+ m_get_cmdline.return_value = "scaleway"
+ fallback_nic.return_value = "ens2"
+ self.datasource.metadata["ipv6"] = {
+ "address": "2000:abc:4444:9876::42:999",
+ "gateway": "2000:abc:4444:9876::42:000",
+ "netmask": "127",
+ }
+
+ netcfg = self.datasource.network_config
+ resp = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "ens2",
+ "subnets": [
+ {"type": "dhcp4"},
+ {
+ "type": "static",
+ "address": "2000:abc:4444:9876::42:999",
+ "gateway": "2000:abc:4444:9876::42:000",
+ "netmask": "127",
+ },
+ ],
+ }
+ ],
+ }
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
+ @mock.patch("cloudinit.util.get_cmdline")
+ def test_network_config_existing(self, m_get_cmdline, fallback_nic):
+ """
+ network_config() should return the same data if a network config
+ already exists
+ """
+ m_get_cmdline.return_value = "scaleway"
+ self.datasource._network_config = "0xdeadbeef"
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, "0xdeadbeef")
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
+ @mock.patch("cloudinit.util.get_cmdline")
+ def test_network_config_unset(self, m_get_cmdline, fallback_nic):
+ """
+ _network_config will be set to sources.UNSET after the first boot.
+ Make sure it behave correctly.
+ """
+ m_get_cmdline.return_value = "scaleway"
+ fallback_nic.return_value = "ens2"
+ self.datasource.metadata["ipv6"] = None
+ self.datasource._network_config = sources.UNSET
+
+ resp = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "ens2",
+ "subnets": [{"type": "dhcp4"}],
+ }
+ ],
+ }
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch("cloudinit.sources.DataSourceScaleway.LOG.warning")
+ @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic")
+ @mock.patch("cloudinit.util.get_cmdline")
+ def test_network_config_cached_none(
+ self, m_get_cmdline, fallback_nic, logwarning
+ ):
+ """
+ network_config() should return config data if cached data is None
+ rather than sources.UNSET
+ """
+ m_get_cmdline.return_value = "scaleway"
+ fallback_nic.return_value = "ens2"
+ self.datasource.metadata["ipv6"] = None
+ self.datasource._network_config = None
+
+ resp = {
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "ens2",
+ "subnets": [{"type": "dhcp4"}],
+ }
+ ],
+ }
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, resp)
+ logwarning.assert_called_with(
+ "Found None as cached _network_config. Resetting to %s",
+ sources.UNSET,
+ )