summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_scaleway.py
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /tests/unittests/test_datasource/test_scaleway.py
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'tests/unittests/test_datasource/test_scaleway.py')
-rw-r--r--tests/unittests/test_datasource/test_scaleway.py473
1 files changed, 0 insertions, 473 deletions
diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py
deleted file mode 100644
index f9e968c5..00000000
--- a/tests/unittests/test_datasource/test_scaleway.py
+++ /dev/null
@@ -1,473 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import json
-
-import httpretty
-import requests
-
-from cloudinit import helpers
-from cloudinit import settings
-from cloudinit import sources
-from cloudinit.sources import DataSourceScaleway
-
-from cloudinit.tests.helpers import mock, HttprettyTestCase, CiTestCase
-
-
-class DataResponses(object):
- """
- Possible responses of the API endpoint
- 169.254.42.42/user_data/cloud-init and
- 169.254.42.42/vendor_data/cloud-init.
- """
-
- FAKE_USER_DATA = '#!/bin/bash\necho "user-data"'
-
- @staticmethod
- def rate_limited(method, uri, headers):
- return 429, headers, ''
-
- @staticmethod
- def api_error(method, uri, headers):
- return 500, headers, ''
-
- @classmethod
- def get_ok(cls, method, uri, headers):
- return 200, headers, cls.FAKE_USER_DATA
-
- @staticmethod
- def empty(method, uri, headers):
- """
- No user data for this server.
- """
- return 404, headers, ''
-
-
-class MetadataResponses(object):
- """
- Possible responses of the metadata API.
- """
-
- FAKE_METADATA = {
- 'id': '00000000-0000-0000-0000-000000000000',
- 'hostname': 'scaleway.host',
- 'tags': [
- "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
- ],
- 'ssh_public_keys': [{
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- 'fingerprint': '2048 06:ae:... login (RSA)'
- }, {
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'fingerprint': '2048 06:ff:... login2 (RSA)'
- }]
- }
-
- @classmethod
- def get_ok(cls, method, uri, headers):
- return 200, headers, json.dumps(cls.FAKE_METADATA)
-
-
-class TestOnScaleway(CiTestCase):
-
- def setUp(self):
- super(TestOnScaleway, self).setUp()
- self.tmp = self.tmp_dir()
-
- def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline):
- mock, faked = fake_dmi
- mock.return_value = 'Scaleway' if faked else 'Whatever'
-
- mock, faked = fake_file_exists
- mock.return_value = faked
-
- mock, faked = fake_cmdline
- mock.return_value = \
- 'initrd=initrd showopts scaleway nousb' if faked \
- else 'BOOT_IMAGE=/vmlinuz-3.11.0-26-generic'
-
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('os.path.exists')
- @mock.patch('cloudinit.dmi.read_dmi_data')
- def test_not_on_scaleway(self, m_read_dmi_data, m_file_exists,
- m_get_cmdline):
- self.install_mocks(
- fake_dmi=(m_read_dmi_data, False),
- fake_file_exists=(m_file_exists, False),
- fake_cmdline=(m_get_cmdline, False)
- )
- self.assertFalse(DataSourceScaleway.on_scaleway())
-
- # When not on Scaleway, get_data() returns False.
- datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp})
- )
- self.assertFalse(datasource.get_data())
-
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('os.path.exists')
- @mock.patch('cloudinit.dmi.read_dmi_data')
- def test_on_scaleway_dmi(self, m_read_dmi_data, m_file_exists,
- m_get_cmdline):
- """
- dmidecode returns "Scaleway".
- """
- # dmidecode returns "Scaleway"
- self.install_mocks(
- fake_dmi=(m_read_dmi_data, True),
- fake_file_exists=(m_file_exists, False),
- fake_cmdline=(m_get_cmdline, False)
- )
- self.assertTrue(DataSourceScaleway.on_scaleway())
-
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('os.path.exists')
- @mock.patch('cloudinit.dmi.read_dmi_data')
- def test_on_scaleway_var_run_scaleway(self, m_read_dmi_data, m_file_exists,
- m_get_cmdline):
- """
- /var/run/scaleway exists.
- """
- self.install_mocks(
- fake_dmi=(m_read_dmi_data, False),
- fake_file_exists=(m_file_exists, True),
- fake_cmdline=(m_get_cmdline, False)
- )
- self.assertTrue(DataSourceScaleway.on_scaleway())
-
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('os.path.exists')
- @mock.patch('cloudinit.dmi.read_dmi_data')
- def test_on_scaleway_cmdline(self, m_read_dmi_data, m_file_exists,
- m_get_cmdline):
- """
- "scaleway" in /proc/cmdline.
- """
- self.install_mocks(
- fake_dmi=(m_read_dmi_data, False),
- fake_file_exists=(m_file_exists, False),
- fake_cmdline=(m_get_cmdline, True)
- )
- self.assertTrue(DataSourceScaleway.on_scaleway())
-
-
-def get_source_address_adapter(*args, **kwargs):
- """
- Scaleway user/vendor data API requires to be called with a privileged port.
-
- If the unittests are run as non-root, the user doesn't have the permission
- to bind on ports below 1024.
-
- This function removes the bind on a privileged address, since anyway the
- HTTP call is mocked by httpretty.
- """
- kwargs.pop('source_address')
- return requests.adapters.HTTPAdapter(*args, **kwargs)
-
-
-class TestDataSourceScaleway(HttprettyTestCase):
-
- def setUp(self):
- tmp = self.tmp_dir()
- self.datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': tmp})
- )
- super(TestDataSourceScaleway, self).setUp()
-
- self.metadata_url = \
- DataSourceScaleway.BUILTIN_DS_CONFIG['metadata_url']
- self.userdata_url = \
- DataSourceScaleway.BUILTIN_DS_CONFIG['userdata_url']
- self.vendordata_url = \
- DataSourceScaleway.BUILTIN_DS_CONFIG['vendordata_url']
-
- self.add_patch('cloudinit.sources.DataSourceScaleway.on_scaleway',
- '_m_on_scaleway', return_value=True)
- self.add_patch(
- 'cloudinit.sources.DataSourceScaleway.net.find_fallback_nic',
- '_m_find_fallback_nic', return_value='scalewaynic0')
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
- @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
- get_source_address_adapter)
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('time.sleep', return_value=None)
- def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4):
- """
- get_data() returns metadata, user data and vendor data.
- """
- m_get_cmdline.return_value = 'scaleway'
-
- # Make user data API return a valid response
- httpretty.register_uri(httpretty.GET, self.metadata_url,
- body=MetadataResponses.get_ok)
- httpretty.register_uri(httpretty.GET, self.userdata_url,
- body=DataResponses.get_ok)
- httpretty.register_uri(httpretty.GET, self.vendordata_url,
- body=DataResponses.get_ok)
- self.datasource.get_data()
-
- self.assertEqual(self.datasource.get_instance_id(),
- MetadataResponses.FAKE_METADATA['id'])
- self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- ].sort())
- self.assertEqual(self.datasource.get_hostname(),
- MetadataResponses.FAKE_METADATA['hostname'])
- self.assertEqual(self.datasource.get_userdata_raw(),
- DataResponses.FAKE_USER_DATA)
- self.assertEqual(self.datasource.get_vendordata_raw(),
- DataResponses.FAKE_USER_DATA)
- self.assertIsNone(self.datasource.availability_zone)
- self.assertIsNone(self.datasource.region)
- self.assertEqual(sleep.call_count, 0)
-
- def test_ssh_keys_empty(self):
- """
- get_public_ssh_keys() should return empty list if no ssh key are
- available
- """
- self.datasource.metadata['tags'] = []
- self.datasource.metadata['ssh_public_keys'] = []
- self.assertEqual(self.datasource.get_public_ssh_keys(), [])
-
- def test_ssh_keys_only_tags(self):
- """
- get_public_ssh_keys() should return list of keys available in tags
- """
- self.datasource.metadata['tags'] = [
- "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
- "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC",
- ]
- self.datasource.metadata['ssh_public_keys'] = []
- self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- ].sort())
-
- def test_ssh_keys_only_conf(self):
- """
- get_public_ssh_keys() should return list of keys available in
- ssh_public_keys field
- """
- self.datasource.metadata['tags'] = []
- self.datasource.metadata['ssh_public_keys'] = [{
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- 'fingerprint': '2048 06:ae:... login (RSA)'
- }, {
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'fingerprint': '2048 06:ff:... login2 (RSA)'
- }]
- self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- ].sort())
-
- def test_ssh_keys_both(self):
- """
- get_public_ssh_keys() should return a merge of keys available
- in ssh_public_keys and tags
- """
- self.datasource.metadata['tags'] = [
- "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
- ]
-
- self.datasource.metadata['ssh_public_keys'] = [{
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- 'fingerprint': '2048 06:ae:... login (RSA)'
- }, {
- 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'fingerprint': '2048 06:ff:... login2 (RSA)'
- }]
- self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
- 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
- ].sort())
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
- @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
- get_source_address_adapter)
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('time.sleep', return_value=None)
- def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4):
- """
- get_data() returns metadata, but no user data nor vendor data.
- """
- m_get_cmdline.return_value = 'scaleway'
-
- # Make user and vendor data APIs return HTTP/404, which means there is
- # no user / vendor data for the server.
- httpretty.register_uri(httpretty.GET, self.metadata_url,
- body=MetadataResponses.get_ok)
- httpretty.register_uri(httpretty.GET, self.userdata_url,
- body=DataResponses.empty)
- httpretty.register_uri(httpretty.GET, self.vendordata_url,
- body=DataResponses.empty)
- self.datasource.get_data()
- self.assertIsNone(self.datasource.get_userdata_raw())
- self.assertIsNone(self.datasource.get_vendordata_raw())
- self.assertEqual(sleep.call_count, 0)
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
- @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
- get_source_address_adapter)
- @mock.patch('cloudinit.util.get_cmdline')
- @mock.patch('time.sleep', return_value=None)
- def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4):
- """
- get_data() is rate limited two times by the metadata API when fetching
- user data.
- """
- m_get_cmdline.return_value = 'scaleway'
-
- httpretty.register_uri(httpretty.GET, self.metadata_url,
- body=MetadataResponses.get_ok)
- httpretty.register_uri(httpretty.GET, self.vendordata_url,
- body=DataResponses.empty)
-
- httpretty.register_uri(
- httpretty.GET, self.userdata_url,
- responses=[
- httpretty.Response(body=DataResponses.rate_limited),
- httpretty.Response(body=DataResponses.rate_limited),
- httpretty.Response(body=DataResponses.get_ok),
- ]
- )
- self.datasource.get_data()
- self.assertEqual(self.datasource.get_userdata_raw(),
- DataResponses.FAKE_USER_DATA)
- self.assertEqual(sleep.call_count, 2)
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
- @mock.patch('cloudinit.util.get_cmdline')
- def test_network_config_ok(self, m_get_cmdline, fallback_nic):
- """
- network_config will only generate IPv4 config if no ipv6 data is
- available in the metadata
- """
- m_get_cmdline.return_value = 'scaleway'
- fallback_nic.return_value = 'ens2'
- self.datasource.metadata['ipv6'] = None
-
- netcfg = self.datasource.network_config
- resp = {
- 'version': 1,
- 'config': [
- {
- 'type': 'physical',
- 'name': 'ens2',
- 'subnets': [{'type': 'dhcp4'}]
- }
- ]
- }
- self.assertEqual(netcfg, resp)
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
- @mock.patch('cloudinit.util.get_cmdline')
- def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic):
- """
- network_config will only generate IPv4/v6 configs if ipv6 data is
- available in the metadata
- """
- m_get_cmdline.return_value = 'scaleway'
- fallback_nic.return_value = 'ens2'
- self.datasource.metadata['ipv6'] = {
- 'address': '2000:abc:4444:9876::42:999',
- 'gateway': '2000:abc:4444:9876::42:000',
- 'netmask': '127',
- }
-
- netcfg = self.datasource.network_config
- resp = {
- 'version': 1,
- 'config': [
- {
- 'type': 'physical',
- 'name': 'ens2',
- 'subnets': [
- {
- 'type': 'dhcp4'
- },
- {
- 'type': 'static',
- 'address': '2000:abc:4444:9876::42:999',
- 'gateway': '2000:abc:4444:9876::42:000',
- 'netmask': '127',
- }
- ]
- }
- ]
- }
- self.assertEqual(netcfg, resp)
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
- @mock.patch('cloudinit.util.get_cmdline')
- def test_network_config_existing(self, m_get_cmdline, fallback_nic):
- """
- network_config() should return the same data if a network config
- already exists
- """
- m_get_cmdline.return_value = 'scaleway'
- self.datasource._network_config = '0xdeadbeef'
-
- netcfg = self.datasource.network_config
- self.assertEqual(netcfg, '0xdeadbeef')
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
- @mock.patch('cloudinit.util.get_cmdline')
- def test_network_config_unset(self, m_get_cmdline, fallback_nic):
- """
- _network_config will be set to sources.UNSET after the first boot.
- Make sure it behave correctly.
- """
- m_get_cmdline.return_value = 'scaleway'
- fallback_nic.return_value = 'ens2'
- self.datasource.metadata['ipv6'] = None
- self.datasource._network_config = sources.UNSET
-
- resp = {
- 'version': 1,
- 'config': [
- {
- 'type': 'physical',
- 'name': 'ens2',
- 'subnets': [{'type': 'dhcp4'}]
- }
- ]
- }
-
- netcfg = self.datasource.network_config
- self.assertEqual(netcfg, resp)
-
- @mock.patch('cloudinit.sources.DataSourceScaleway.LOG.warning')
- @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
- @mock.patch('cloudinit.util.get_cmdline')
- def test_network_config_cached_none(self, m_get_cmdline, fallback_nic,
- logwarning):
- """
- network_config() should return config data if cached data is None
- rather than sources.UNSET
- """
- m_get_cmdline.return_value = 'scaleway'
- fallback_nic.return_value = 'ens2'
- self.datasource.metadata['ipv6'] = None
- self.datasource._network_config = None
-
- resp = {
- 'version': 1,
- 'config': [
- {
- 'type': 'physical',
- 'name': 'ens2',
- 'subnets': [{'type': 'dhcp4'}]
- }
- ]
- }
-
- netcfg = self.datasource.network_config
- self.assertEqual(netcfg, resp)
- logwarning.assert_called_with('Found None as cached _network_config. '
- 'Resetting to %s', sources.UNSET)