# This file is part of cloud-init. See LICENSE file for license information. import json import httpretty import requests from cloudinit import helpers, settings, sources from cloudinit.sources import DataSourceScaleway from tests.unittests.helpers import CiTestCase, HttprettyTestCase, mock class DataResponses(object): """ Possible responses of the API endpoint 169.254.42.42/user_data/cloud-init and 169.254.42.42/vendor_data/cloud-init. """ FAKE_USER_DATA = '#!/bin/bash\necho "user-data"' @staticmethod def rate_limited(method, uri, headers): return 429, headers, "" @staticmethod def api_error(method, uri, headers): return 500, headers, "" @classmethod def get_ok(cls, method, uri, headers): return 200, headers, cls.FAKE_USER_DATA @staticmethod def empty(method, uri, headers): """ No user data for this server. """ return 404, headers, "" class MetadataResponses(object): """ Possible responses of the metadata API. """ FAKE_METADATA = { "id": "00000000-0000-0000-0000-000000000000", "hostname": "scaleway.host", "tags": [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", ], "ssh_public_keys": [ { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", "fingerprint": "2048 06:ae:... login (RSA)", }, { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "fingerprint": "2048 06:ff:... login2 (RSA)", }, ], } @classmethod def get_ok(cls, method, uri, headers): return 200, headers, json.dumps(cls.FAKE_METADATA) class TestOnScaleway(CiTestCase): def setUp(self): super(TestOnScaleway, self).setUp() self.tmp = self.tmp_dir() def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline): mock, faked = fake_dmi mock.return_value = "Scaleway" if faked else "Whatever" mock, faked = fake_file_exists mock.return_value = faked mock, faked = fake_cmdline mock.return_value = ( "initrd=initrd showopts scaleway nousb" if faked else "BOOT_IMAGE=/vmlinuz-3.11.0-26-generic" ) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("os.path.exists") @mock.patch("cloudinit.dmi.read_dmi_data") def test_not_on_scaleway( self, m_read_dmi_data, m_file_exists, m_get_cmdline ): self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, False), fake_cmdline=(m_get_cmdline, False), ) self.assertFalse(DataSourceScaleway.on_scaleway()) # When not on Scaleway, get_data() returns False. datasource = DataSourceScaleway.DataSourceScaleway( settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp}) ) self.assertFalse(datasource.get_data()) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("os.path.exists") @mock.patch("cloudinit.dmi.read_dmi_data") def test_on_scaleway_dmi( self, m_read_dmi_data, m_file_exists, m_get_cmdline ): """ dmidecode returns "Scaleway". """ # dmidecode returns "Scaleway" self.install_mocks( fake_dmi=(m_read_dmi_data, True), fake_file_exists=(m_file_exists, False), fake_cmdline=(m_get_cmdline, False), ) self.assertTrue(DataSourceScaleway.on_scaleway()) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("os.path.exists") @mock.patch("cloudinit.dmi.read_dmi_data") def test_on_scaleway_var_run_scaleway( self, m_read_dmi_data, m_file_exists, m_get_cmdline ): """ /var/run/scaleway exists. """ self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, True), fake_cmdline=(m_get_cmdline, False), ) self.assertTrue(DataSourceScaleway.on_scaleway()) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("os.path.exists") @mock.patch("cloudinit.dmi.read_dmi_data") def test_on_scaleway_cmdline( self, m_read_dmi_data, m_file_exists, m_get_cmdline ): """ "scaleway" in /proc/cmdline. """ self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, False), fake_cmdline=(m_get_cmdline, True), ) self.assertTrue(DataSourceScaleway.on_scaleway()) def get_source_address_adapter(*args, **kwargs): """ Scaleway user/vendor data API requires to be called with a privileged port. If the unittests are run as non-root, the user doesn't have the permission to bind on ports below 1024. This function removes the bind on a privileged address, since anyway the HTTP call is mocked by httpretty. """ kwargs.pop("source_address") return requests.adapters.HTTPAdapter(*args, **kwargs) class TestDataSourceScaleway(HttprettyTestCase): def setUp(self): tmp = self.tmp_dir() self.datasource = DataSourceScaleway.DataSourceScaleway( settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": tmp}) ) super(TestDataSourceScaleway, self).setUp() self.metadata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ "metadata_url" ] self.userdata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ "userdata_url" ] self.vendordata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ "vendordata_url" ] self.add_patch( "cloudinit.sources.DataSourceScaleway.on_scaleway", "_m_on_scaleway", return_value=True, ) self.add_patch( "cloudinit.sources.DataSourceScaleway.net.find_fallback_nic", "_m_find_fallback_nic", return_value="scalewaynic0", ) @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") @mock.patch( "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", get_source_address_adapter, ) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("time.sleep", return_value=None) def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, user data and vendor data. """ m_get_cmdline.return_value = "scaleway" # Make user data API return a valid response httpretty.register_uri( httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok ) httpretty.register_uri( httpretty.GET, self.userdata_url, body=DataResponses.get_ok ) httpretty.register_uri( httpretty.GET, self.vendordata_url, body=DataResponses.get_ok ) self.datasource.get_data() self.assertEqual( self.datasource.get_instance_id(), MetadataResponses.FAKE_METADATA["id"], ) self.assertEqual( self.datasource.get_public_ssh_keys().sort(), [ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", ].sort(), ) self.assertEqual( self.datasource.get_hostname(), MetadataResponses.FAKE_METADATA["hostname"], ) self.assertEqual( self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA ) self.assertEqual( self.datasource.get_vendordata_raw(), DataResponses.FAKE_USER_DATA ) self.assertIsNone(self.datasource.availability_zone) self.assertIsNone(self.datasource.region) self.assertEqual(sleep.call_count, 0) def test_ssh_keys_empty(self): """ get_public_ssh_keys() should return empty list if no ssh key are available """ self.datasource.metadata["tags"] = [] self.datasource.metadata["ssh_public_keys"] = [] self.assertEqual(self.datasource.get_public_ssh_keys(), []) def test_ssh_keys_only_tags(self): """ get_public_ssh_keys() should return list of keys available in tags """ self.datasource.metadata["tags"] = [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC", ] self.datasource.metadata["ssh_public_keys"] = [] self.assertEqual( self.datasource.get_public_ssh_keys().sort(), [ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", ].sort(), ) def test_ssh_keys_only_conf(self): """ get_public_ssh_keys() should return list of keys available in ssh_public_keys field """ self.datasource.metadata["tags"] = [] self.datasource.metadata["ssh_public_keys"] = [ { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", "fingerprint": "2048 06:ae:... login (RSA)", }, { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "fingerprint": "2048 06:ff:... login2 (RSA)", }, ] self.assertEqual( self.datasource.get_public_ssh_keys().sort(), [ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", ].sort(), ) def test_ssh_keys_both(self): """ get_public_ssh_keys() should return a merge of keys available in ssh_public_keys and tags """ self.datasource.metadata["tags"] = [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", ] self.datasource.metadata["ssh_public_keys"] = [ { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", "fingerprint": "2048 06:ae:... login (RSA)", }, { "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "fingerprint": "2048 06:ff:... login2 (RSA)", }, ] self.assertEqual( self.datasource.get_public_ssh_keys().sort(), [ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", ].sort(), ) @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") @mock.patch( "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", get_source_address_adapter, ) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("time.sleep", return_value=None) def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, but no user data nor vendor data. """ m_get_cmdline.return_value = "scaleway" # Make user and vendor data APIs return HTTP/404, which means there is # no user / vendor data for the server. httpretty.register_uri( httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok ) httpretty.register_uri( httpretty.GET, self.userdata_url, body=DataResponses.empty ) httpretty.register_uri( httpretty.GET, self.vendordata_url, body=DataResponses.empty ) self.datasource.get_data() self.assertIsNone(self.datasource.get_userdata_raw()) self.assertIsNone(self.datasource.get_vendordata_raw()) self.assertEqual(sleep.call_count, 0) @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") @mock.patch( "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", get_source_address_adapter, ) @mock.patch("cloudinit.util.get_cmdline") @mock.patch("time.sleep", return_value=None) def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4): """ get_data() is rate limited two times by the metadata API when fetching user data. """ m_get_cmdline.return_value = "scaleway" httpretty.register_uri( httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok ) httpretty.register_uri( httpretty.GET, self.vendordata_url, body=DataResponses.empty ) httpretty.register_uri( httpretty.GET, self.userdata_url, responses=[ httpretty.Response(body=DataResponses.rate_limited), httpretty.Response(body=DataResponses.rate_limited), httpretty.Response(body=DataResponses.get_ok), ], ) self.datasource.get_data() self.assertEqual( self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA ) self.assertEqual(sleep.call_count, 2) @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") @mock.patch("cloudinit.util.get_cmdline") def test_network_config_ok(self, m_get_cmdline, fallback_nic): """ network_config will only generate IPv4 config if no ipv6 data is available in the metadata """ m_get_cmdline.return_value = "scaleway" fallback_nic.return_value = "ens2" self.datasource.metadata["ipv6"] = None netcfg = self.datasource.network_config resp = { "version": 1, "config": [ { "type": "physical", "name": "ens2", "subnets": [{"type": "dhcp4"}], } ], } self.assertEqual(netcfg, resp) @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") @mock.patch("cloudinit.util.get_cmdline") def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic): """ network_config will only generate IPv4/v6 configs if ipv6 data is available in the metadata """ m_get_cmdline.return_value = "scaleway" fallback_nic.return_value = "ens2" self.datasource.metadata["ipv6"] = { "address": "2000:abc:4444:9876::42:999", "gateway": "2000:abc:4444:9876::42:000", "netmask": "127", } netcfg = self.datasource.network_config resp = { "version": 1, "config": [ { "type": "physical", "name": "ens2", "subnets": [ {"type": "dhcp4"}, { "type": "static", "address": "2000:abc:4444:9876::42:999", "gateway": "2000:abc:4444:9876::42:000", "netmask": "127", }, ], } ], } self.assertEqual(netcfg, resp) @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") @mock.patch("cloudinit.util.get_cmdline") def test_network_config_existing(self, m_get_cmdline, fallback_nic): """ network_config() should return the same data if a network config already exists """ m_get_cmdline.return_value = "scaleway" self.datasource._network_config = "0xdeadbeef" netcfg = self.datasource.network_config self.assertEqual(netcfg, "0xdeadbeef") @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") @mock.patch("cloudinit.util.get_cmdline") def test_network_config_unset(self, m_get_cmdline, fallback_nic): """ _network_config will be set to sources.UNSET after the first boot. Make sure it behave correctly. """ m_get_cmdline.return_value = "scaleway" fallback_nic.return_value = "ens2" self.datasource.metadata["ipv6"] = None self.datasource._network_config = sources.UNSET resp = { "version": 1, "config": [ { "type": "physical", "name": "ens2", "subnets": [{"type": "dhcp4"}], } ], } netcfg = self.datasource.network_config self.assertEqual(netcfg, resp) @mock.patch("cloudinit.sources.DataSourceScaleway.LOG.warning") @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") @mock.patch("cloudinit.util.get_cmdline") def test_network_config_cached_none( self, m_get_cmdline, fallback_nic, logwarning ): """ network_config() should return config data if cached data is None rather than sources.UNSET """ m_get_cmdline.return_value = "scaleway" fallback_nic.return_value = "ens2" self.datasource.metadata["ipv6"] = None self.datasource._network_config = None resp = { "version": 1, "config": [ { "type": "physical", "name": "ens2", "subnets": [{"type": "dhcp4"}], } ], } netcfg = self.datasource.network_config self.assertEqual(netcfg, resp) logwarning.assert_called_with( "Found None as cached _network_config. Resetting to %s", sources.UNSET, )