diff options
author | James Falcon <james.falcon@canonical.com> | 2021-12-15 20:16:38 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-15 19:16:38 -0700 |
commit | bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf (patch) | |
tree | 1fbb3269fc87e39832e3286ef42eefd2b23fcd44 /tests/unittests/sources/test_scaleway.py | |
parent | 2bcf4fa972fde686c2e3141c58e640640b44dd00 (diff) | |
download | vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.tar.gz vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.zip |
Adopt Black and isort (SC-700) (#1157)
Applied Black and isort, fixed any linting issues, updated tox.ini
and CI.
Diffstat (limited to 'tests/unittests/sources/test_scaleway.py')
-rw-r--r-- | tests/unittests/sources/test_scaleway.py | 481 |
1 files changed, 267 insertions, 214 deletions
diff --git a/tests/unittests/sources/test_scaleway.py b/tests/unittests/sources/test_scaleway.py index 33ae26b8..d7e8b969 100644 --- a/tests/unittests/sources/test_scaleway.py +++ b/tests/unittests/sources/test_scaleway.py @@ -5,12 +5,9 @@ import json import httpretty import requests -from cloudinit import helpers -from cloudinit import settings -from cloudinit import sources +from cloudinit import helpers, settings, sources from cloudinit.sources import DataSourceScaleway - -from tests.unittests.helpers import mock, HttprettyTestCase, CiTestCase +from tests.unittests.helpers import CiTestCase, HttprettyTestCase, mock class DataResponses(object): @@ -24,11 +21,11 @@ class DataResponses(object): @staticmethod def rate_limited(method, uri, headers): - return 429, headers, '' + return 429, headers, "" @staticmethod def api_error(method, uri, headers): - return 500, headers, '' + return 500, headers, "" @classmethod def get_ok(cls, method, uri, headers): @@ -39,7 +36,7 @@ class DataResponses(object): """ No user data for this server. """ - return 404, headers, '' + return 404, headers, "" class MetadataResponses(object): @@ -48,18 +45,21 @@ class MetadataResponses(object): """ FAKE_METADATA = { - 'id': '00000000-0000-0000-0000-000000000000', - 'hostname': 'scaleway.host', - 'tags': [ + "id": "00000000-0000-0000-0000-000000000000", + "hostname": "scaleway.host", + "tags": [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", ], - 'ssh_public_keys': [{ - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - 'fingerprint': '2048 06:ae:... login (RSA)' - }, { - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'fingerprint': '2048 06:ff:... login2 (RSA)' - }] + "ssh_public_keys": [ + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + "fingerprint": "2048 06:ae:... login (RSA)", + }, + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "fingerprint": "2048 06:ff:... login2 (RSA)", + }, + ], } @classmethod @@ -68,46 +68,49 @@ class MetadataResponses(object): class TestOnScaleway(CiTestCase): - def setUp(self): super(TestOnScaleway, self).setUp() self.tmp = self.tmp_dir() def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline): mock, faked = fake_dmi - mock.return_value = 'Scaleway' if faked else 'Whatever' + mock.return_value = "Scaleway" if faked else "Whatever" mock, faked = fake_file_exists mock.return_value = faked mock, faked = fake_cmdline - mock.return_value = \ - 'initrd=initrd showopts scaleway nousb' if faked \ - else 'BOOT_IMAGE=/vmlinuz-3.11.0-26-generic' - - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('os.path.exists') - @mock.patch('cloudinit.dmi.read_dmi_data') - def test_not_on_scaleway(self, m_read_dmi_data, m_file_exists, - m_get_cmdline): + mock.return_value = ( + "initrd=initrd showopts scaleway nousb" + if faked + else "BOOT_IMAGE=/vmlinuz-3.11.0-26-generic" + ) + + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("os.path.exists") + @mock.patch("cloudinit.dmi.read_dmi_data") + def test_not_on_scaleway( + self, m_read_dmi_data, m_file_exists, m_get_cmdline + ): self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, False), - fake_cmdline=(m_get_cmdline, False) + fake_cmdline=(m_get_cmdline, False), ) self.assertFalse(DataSourceScaleway.on_scaleway()) # When not on Scaleway, get_data() returns False. datasource = DataSourceScaleway.DataSourceScaleway( - settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp}) + settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": self.tmp}) ) self.assertFalse(datasource.get_data()) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('os.path.exists') - @mock.patch('cloudinit.dmi.read_dmi_data') - def test_on_scaleway_dmi(self, m_read_dmi_data, m_file_exists, - m_get_cmdline): + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("os.path.exists") + @mock.patch("cloudinit.dmi.read_dmi_data") + def test_on_scaleway_dmi( + self, m_read_dmi_data, m_file_exists, m_get_cmdline + ): """ dmidecode returns "Scaleway". """ @@ -115,37 +118,39 @@ class TestOnScaleway(CiTestCase): self.install_mocks( fake_dmi=(m_read_dmi_data, True), fake_file_exists=(m_file_exists, False), - fake_cmdline=(m_get_cmdline, False) + fake_cmdline=(m_get_cmdline, False), ) self.assertTrue(DataSourceScaleway.on_scaleway()) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('os.path.exists') - @mock.patch('cloudinit.dmi.read_dmi_data') - def test_on_scaleway_var_run_scaleway(self, m_read_dmi_data, m_file_exists, - m_get_cmdline): + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("os.path.exists") + @mock.patch("cloudinit.dmi.read_dmi_data") + def test_on_scaleway_var_run_scaleway( + self, m_read_dmi_data, m_file_exists, m_get_cmdline + ): """ /var/run/scaleway exists. """ self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, True), - fake_cmdline=(m_get_cmdline, False) + fake_cmdline=(m_get_cmdline, False), ) self.assertTrue(DataSourceScaleway.on_scaleway()) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('os.path.exists') - @mock.patch('cloudinit.dmi.read_dmi_data') - def test_on_scaleway_cmdline(self, m_read_dmi_data, m_file_exists, - m_get_cmdline): + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("os.path.exists") + @mock.patch("cloudinit.dmi.read_dmi_data") + def test_on_scaleway_cmdline( + self, m_read_dmi_data, m_file_exists, m_get_cmdline + ): """ "scaleway" in /proc/cmdline. """ self.install_mocks( fake_dmi=(m_read_dmi_data, False), fake_file_exists=(m_file_exists, False), - fake_cmdline=(m_get_cmdline, True) + fake_cmdline=(m_get_cmdline, True), ) self.assertTrue(DataSourceScaleway.on_scaleway()) @@ -160,65 +165,86 @@ def get_source_address_adapter(*args, **kwargs): This function removes the bind on a privileged address, since anyway the HTTP call is mocked by httpretty. """ - kwargs.pop('source_address') + kwargs.pop("source_address") return requests.adapters.HTTPAdapter(*args, **kwargs) class TestDataSourceScaleway(HttprettyTestCase): - def setUp(self): tmp = self.tmp_dir() self.datasource = DataSourceScaleway.DataSourceScaleway( - settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': tmp}) + settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": tmp}) ) super(TestDataSourceScaleway, self).setUp() - self.metadata_url = \ - DataSourceScaleway.BUILTIN_DS_CONFIG['metadata_url'] - self.userdata_url = \ - DataSourceScaleway.BUILTIN_DS_CONFIG['userdata_url'] - self.vendordata_url = \ - DataSourceScaleway.BUILTIN_DS_CONFIG['vendordata_url'] + self.metadata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ + "metadata_url" + ] + self.userdata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ + "userdata_url" + ] + self.vendordata_url = DataSourceScaleway.BUILTIN_DS_CONFIG[ + "vendordata_url" + ] - self.add_patch('cloudinit.sources.DataSourceScaleway.on_scaleway', - '_m_on_scaleway', return_value=True) self.add_patch( - 'cloudinit.sources.DataSourceScaleway.net.find_fallback_nic', - '_m_find_fallback_nic', return_value='scalewaynic0') - - @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') - @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', - get_source_address_adapter) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('time.sleep', return_value=None) + "cloudinit.sources.DataSourceScaleway.on_scaleway", + "_m_on_scaleway", + return_value=True, + ) + self.add_patch( + "cloudinit.sources.DataSourceScaleway.net.find_fallback_nic", + "_m_find_fallback_nic", + return_value="scalewaynic0", + ) + + @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") + @mock.patch( + "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", + get_source_address_adapter, + ) + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("time.sleep", return_value=None) def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, user data and vendor data. """ - m_get_cmdline.return_value = 'scaleway' + m_get_cmdline.return_value = "scaleway" # Make user data API return a valid response - httpretty.register_uri(httpretty.GET, self.metadata_url, - body=MetadataResponses.get_ok) - httpretty.register_uri(httpretty.GET, self.userdata_url, - body=DataResponses.get_ok) - httpretty.register_uri(httpretty.GET, self.vendordata_url, - body=DataResponses.get_ok) + httpretty.register_uri( + httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok + ) + httpretty.register_uri( + httpretty.GET, self.userdata_url, body=DataResponses.get_ok + ) + httpretty.register_uri( + httpretty.GET, self.vendordata_url, body=DataResponses.get_ok + ) self.datasource.get_data() - self.assertEqual(self.datasource.get_instance_id(), - MetadataResponses.FAKE_METADATA['id']) - self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [ - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - ].sort()) - self.assertEqual(self.datasource.get_hostname(), - MetadataResponses.FAKE_METADATA['hostname']) - self.assertEqual(self.datasource.get_userdata_raw(), - DataResponses.FAKE_USER_DATA) - self.assertEqual(self.datasource.get_vendordata_raw(), - DataResponses.FAKE_USER_DATA) + self.assertEqual( + self.datasource.get_instance_id(), + MetadataResponses.FAKE_METADATA["id"], + ) + self.assertEqual( + self.datasource.get_public_ssh_keys().sort(), + [ + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + ].sort(), + ) + self.assertEqual( + self.datasource.get_hostname(), + MetadataResponses.FAKE_METADATA["hostname"], + ) + self.assertEqual( + self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA + ) + self.assertEqual( + self.datasource.get_vendordata_raw(), DataResponses.FAKE_USER_DATA + ) self.assertIsNone(self.datasource.availability_zone) self.assertIsNone(self.datasource.region) self.assertEqual(sleep.call_count, 0) @@ -228,246 +254,273 @@ class TestDataSourceScaleway(HttprettyTestCase): get_public_ssh_keys() should return empty list if no ssh key are available """ - self.datasource.metadata['tags'] = [] - self.datasource.metadata['ssh_public_keys'] = [] + self.datasource.metadata["tags"] = [] + self.datasource.metadata["ssh_public_keys"] = [] self.assertEqual(self.datasource.get_public_ssh_keys(), []) def test_ssh_keys_only_tags(self): """ get_public_ssh_keys() should return list of keys available in tags """ - self.datasource.metadata['tags'] = [ + self.datasource.metadata["tags"] = [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC", ] - self.datasource.metadata['ssh_public_keys'] = [] - self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [ - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - ].sort()) + self.datasource.metadata["ssh_public_keys"] = [] + self.assertEqual( + self.datasource.get_public_ssh_keys().sort(), + [ + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + ].sort(), + ) def test_ssh_keys_only_conf(self): """ get_public_ssh_keys() should return list of keys available in ssh_public_keys field """ - self.datasource.metadata['tags'] = [] - self.datasource.metadata['ssh_public_keys'] = [{ - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - 'fingerprint': '2048 06:ae:... login (RSA)' - }, { - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'fingerprint': '2048 06:ff:... login2 (RSA)' - }] - self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [ - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - ].sort()) + self.datasource.metadata["tags"] = [] + self.datasource.metadata["ssh_public_keys"] = [ + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + "fingerprint": "2048 06:ae:... login (RSA)", + }, + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "fingerprint": "2048 06:ff:... login2 (RSA)", + }, + ] + self.assertEqual( + self.datasource.get_public_ssh_keys().sort(), + [ + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + ].sort(), + ) def test_ssh_keys_both(self): """ get_public_ssh_keys() should return a merge of keys available in ssh_public_keys and tags """ - self.datasource.metadata['tags'] = [ + self.datasource.metadata["tags"] = [ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD", ] - self.datasource.metadata['ssh_public_keys'] = [{ - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - 'fingerprint': '2048 06:ae:... login (RSA)' - }, { - 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'fingerprint': '2048 06:ff:... login2 (RSA)' - }] - self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [ - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD', - 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA', - ].sort()) - - @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') - @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', - get_source_address_adapter) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('time.sleep', return_value=None) + self.datasource.metadata["ssh_public_keys"] = [ + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + "fingerprint": "2048 06:ae:... login (RSA)", + }, + { + "key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "fingerprint": "2048 06:ff:... login2 (RSA)", + }, + ] + self.assertEqual( + self.datasource.get_public_ssh_keys().sort(), + [ + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD", + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA", + ].sort(), + ) + + @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") + @mock.patch( + "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", + get_source_address_adapter, + ) + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("time.sleep", return_value=None) def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, but no user data nor vendor data. """ - m_get_cmdline.return_value = 'scaleway' + m_get_cmdline.return_value = "scaleway" # Make user and vendor data APIs return HTTP/404, which means there is # no user / vendor data for the server. - httpretty.register_uri(httpretty.GET, self.metadata_url, - body=MetadataResponses.get_ok) - httpretty.register_uri(httpretty.GET, self.userdata_url, - body=DataResponses.empty) - httpretty.register_uri(httpretty.GET, self.vendordata_url, - body=DataResponses.empty) + httpretty.register_uri( + httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok + ) + httpretty.register_uri( + httpretty.GET, self.userdata_url, body=DataResponses.empty + ) + httpretty.register_uri( + httpretty.GET, self.vendordata_url, body=DataResponses.empty + ) self.datasource.get_data() self.assertIsNone(self.datasource.get_userdata_raw()) self.assertIsNone(self.datasource.get_vendordata_raw()) self.assertEqual(sleep.call_count, 0) - @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') - @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', - get_source_address_adapter) - @mock.patch('cloudinit.util.get_cmdline') - @mock.patch('time.sleep', return_value=None) + @mock.patch("cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4") + @mock.patch( + "cloudinit.sources.DataSourceScaleway.SourceAddressAdapter", + get_source_address_adapter, + ) + @mock.patch("cloudinit.util.get_cmdline") + @mock.patch("time.sleep", return_value=None) def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4): """ get_data() is rate limited two times by the metadata API when fetching user data. """ - m_get_cmdline.return_value = 'scaleway' + m_get_cmdline.return_value = "scaleway" - httpretty.register_uri(httpretty.GET, self.metadata_url, - body=MetadataResponses.get_ok) - httpretty.register_uri(httpretty.GET, self.vendordata_url, - body=DataResponses.empty) + httpretty.register_uri( + httpretty.GET, self.metadata_url, body=MetadataResponses.get_ok + ) + httpretty.register_uri( + httpretty.GET, self.vendordata_url, body=DataResponses.empty + ) httpretty.register_uri( - httpretty.GET, self.userdata_url, + httpretty.GET, + self.userdata_url, responses=[ httpretty.Response(body=DataResponses.rate_limited), httpretty.Response(body=DataResponses.rate_limited), httpretty.Response(body=DataResponses.get_ok), - ] + ], ) self.datasource.get_data() - self.assertEqual(self.datasource.get_userdata_raw(), - DataResponses.FAKE_USER_DATA) + self.assertEqual( + self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA + ) self.assertEqual(sleep.call_count, 2) - @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") + @mock.patch("cloudinit.util.get_cmdline") def test_network_config_ok(self, m_get_cmdline, fallback_nic): """ network_config will only generate IPv4 config if no ipv6 data is available in the metadata """ - m_get_cmdline.return_value = 'scaleway' - fallback_nic.return_value = 'ens2' - self.datasource.metadata['ipv6'] = None + m_get_cmdline.return_value = "scaleway" + fallback_nic.return_value = "ens2" + self.datasource.metadata["ipv6"] = None netcfg = self.datasource.network_config resp = { - 'version': 1, - 'config': [ + "version": 1, + "config": [ { - 'type': 'physical', - 'name': 'ens2', - 'subnets': [{'type': 'dhcp4'}] + "type": "physical", + "name": "ens2", + "subnets": [{"type": "dhcp4"}], } - ] + ], } self.assertEqual(netcfg, resp) - @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") + @mock.patch("cloudinit.util.get_cmdline") def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic): """ network_config will only generate IPv4/v6 configs if ipv6 data is available in the metadata """ - m_get_cmdline.return_value = 'scaleway' - fallback_nic.return_value = 'ens2' - self.datasource.metadata['ipv6'] = { - 'address': '2000:abc:4444:9876::42:999', - 'gateway': '2000:abc:4444:9876::42:000', - 'netmask': '127', + m_get_cmdline.return_value = "scaleway" + fallback_nic.return_value = "ens2" + self.datasource.metadata["ipv6"] = { + "address": "2000:abc:4444:9876::42:999", + "gateway": "2000:abc:4444:9876::42:000", + "netmask": "127", } netcfg = self.datasource.network_config resp = { - 'version': 1, - 'config': [ + "version": 1, + "config": [ { - 'type': 'physical', - 'name': 'ens2', - 'subnets': [ + "type": "physical", + "name": "ens2", + "subnets": [ + {"type": "dhcp4"}, { - 'type': 'dhcp4' + "type": "static", + "address": "2000:abc:4444:9876::42:999", + "gateway": "2000:abc:4444:9876::42:000", + "netmask": "127", }, - { - 'type': 'static', - 'address': '2000:abc:4444:9876::42:999', - 'gateway': '2000:abc:4444:9876::42:000', - 'netmask': '127', - } - ] + ], } - ] + ], } self.assertEqual(netcfg, resp) - @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") + @mock.patch("cloudinit.util.get_cmdline") def test_network_config_existing(self, m_get_cmdline, fallback_nic): """ network_config() should return the same data if a network config already exists """ - m_get_cmdline.return_value = 'scaleway' - self.datasource._network_config = '0xdeadbeef' + m_get_cmdline.return_value = "scaleway" + self.datasource._network_config = "0xdeadbeef" netcfg = self.datasource.network_config - self.assertEqual(netcfg, '0xdeadbeef') + self.assertEqual(netcfg, "0xdeadbeef") - @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') - @mock.patch('cloudinit.util.get_cmdline') + @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") + @mock.patch("cloudinit.util.get_cmdline") def test_network_config_unset(self, m_get_cmdline, fallback_nic): """ _network_config will be set to sources.UNSET after the first boot. Make sure it behave correctly. """ - m_get_cmdline.return_value = 'scaleway' - fallback_nic.return_value = 'ens2' - self.datasource.metadata['ipv6'] = None + m_get_cmdline.return_value = "scaleway" + fallback_nic.return_value = "ens2" + self.datasource.metadata["ipv6"] = None self.datasource._network_config = sources.UNSET resp = { - 'version': 1, - 'config': [ + "version": 1, + "config": [ { - 'type': 'physical', - 'name': 'ens2', - 'subnets': [{'type': 'dhcp4'}] + "type": "physical", + "name": "ens2", + "subnets": [{"type": "dhcp4"}], } - ] + ], } netcfg = self.datasource.network_config self.assertEqual(netcfg, resp) - @mock.patch('cloudinit.sources.DataSourceScaleway.LOG.warning') - @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') - @mock.patch('cloudinit.util.get_cmdline') - def test_network_config_cached_none(self, m_get_cmdline, fallback_nic, - logwarning): + @mock.patch("cloudinit.sources.DataSourceScaleway.LOG.warning") + @mock.patch("cloudinit.sources.DataSourceScaleway.net.find_fallback_nic") + @mock.patch("cloudinit.util.get_cmdline") + def test_network_config_cached_none( + self, m_get_cmdline, fallback_nic, logwarning + ): """ network_config() should return config data if cached data is None rather than sources.UNSET """ - m_get_cmdline.return_value = 'scaleway' - fallback_nic.return_value = 'ens2' - self.datasource.metadata['ipv6'] = None + m_get_cmdline.return_value = "scaleway" + fallback_nic.return_value = "ens2" + self.datasource.metadata["ipv6"] = None self.datasource._network_config = None resp = { - 'version': 1, - 'config': [ + "version": 1, + "config": [ { - 'type': 'physical', - 'name': 'ens2', - 'subnets': [{'type': 'dhcp4'}] + "type": "physical", + "name": "ens2", + "subnets": [{"type": "dhcp4"}], } - ] + ], } netcfg = self.datasource.network_config self.assertEqual(netcfg, resp) - logwarning.assert_called_with('Found None as cached _network_config. ' - 'Resetting to %s', sources.UNSET) + logwarning.assert_called_with( + "Found None as cached _network_config. Resetting to %s", + sources.UNSET, + ) |