diff options
Diffstat (limited to 'tests/integration_tests')
50 files changed, 976 insertions, 854 deletions
diff --git a/tests/integration_tests/__init__.py b/tests/integration_tests/__init__.py index e1d4cd28..81f9b02f 100644 --- a/tests/integration_tests/__init__.py +++ b/tests/integration_tests/__init__.py @@ -7,6 +7,8 @@ def random_mac_address() -> str: The MAC address will have a 1 in its least significant bit, indicating it to be a locally administered address. """ - return "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255), - random.randint(0, 255), - random.randint(0, 255)) + return "02:00:00:%02x:%02x:%02x" % ( + random.randint(0, 255), + random.randint(0, 255), + random.randint(0, 255), + ) diff --git a/tests/integration_tests/bugs/test_gh570.py b/tests/integration_tests/bugs/test_gh570.py index 534cfb9a..ddc74503 100644 --- a/tests/integration_tests/bugs/test_gh570.py +++ b/tests/integration_tests/bugs/test_gh570.py @@ -4,9 +4,10 @@ Test that we can add optional vendor-data to the seedfrom file in a NoCloud environment """ -from tests.integration_tests.instances import IntegrationInstance import pytest +from tests.integration_tests.instances import IntegrationInstance + VENDOR_DATA = """\ #cloud-config runcmd: @@ -19,7 +20,7 @@ runcmd: @pytest.mark.lxd_container @pytest.mark.lxd_vm def test_nocloud_seedfrom_vendordata(client: IntegrationInstance): - seed_dir = '/var/tmp/test_seed_dir' + seed_dir = "/var/tmp/test_seed_dir" result = client.execute( "mkdir {seed_dir} && " "touch {seed_dir}/user-data && " @@ -30,10 +31,10 @@ def test_nocloud_seedfrom_vendordata(client: IntegrationInstance): assert result.return_code == 0 client.write_to_file( - '{}/vendor-data'.format(seed_dir), + "{}/vendor-data".format(seed_dir), VENDOR_DATA, ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() - assert client.execute('cloud-init status').ok - assert 'seeded_vendordata_test_file' in client.execute('ls /var/tmp') + assert client.execute("cloud-init status").ok + assert "seeded_vendordata_test_file" in client.execute("ls /var/tmp") diff --git a/tests/integration_tests/bugs/test_gh626.py b/tests/integration_tests/bugs/test_gh626.py index dba01b34..7c720143 100644 --- a/tests/integration_tests/bugs/test_gh626.py +++ b/tests/integration_tests/bugs/test_gh626.py @@ -11,7 +11,6 @@ from tests.integration_tests import random_mac_address from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance - MAC_ADDRESS = random_mac_address() NETWORK_CONFIG = """\ version: 2 @@ -21,7 +20,9 @@ ethernets: wakeonlan: true match: macaddress: {} -""".format(MAC_ADDRESS) +""".format( + MAC_ADDRESS +) EXPECTED_ENI_END = """\ iface eth0 inet dhcp @@ -31,17 +32,19 @@ iface eth0 inet dhcp @pytest.mark.sru_2020_11 @pytest.mark.lxd_container @pytest.mark.lxd_vm -@pytest.mark.lxd_config_dict({ - 'user.network-config': NETWORK_CONFIG, - "volatile.eth0.hwaddr": MAC_ADDRESS, -}) +@pytest.mark.lxd_config_dict( + { + "user.network-config": NETWORK_CONFIG, + "volatile.eth0.hwaddr": MAC_ADDRESS, + } +) def test_wakeonlan(client: IntegrationInstance): - if ImageSpecification.from_os_image().release == 'xenial': - eni = client.execute('cat /etc/network/interfaces.d/50-cloud-init.cfg') + if ImageSpecification.from_os_image().release == "xenial": + eni = client.execute("cat /etc/network/interfaces.d/50-cloud-init.cfg") assert eni.endswith(EXPECTED_ENI_END) return - netplan_cfg = client.execute('cat /etc/netplan/50-cloud-init.yaml') + netplan_cfg = client.execute("cat /etc/netplan/50-cloud-init.yaml") netplan_yaml = yaml.safe_load(netplan_cfg) - assert 'wakeonlan' in netplan_yaml['network']['ethernets']['eth0'] - assert netplan_yaml['network']['ethernets']['eth0']['wakeonlan'] is True + assert "wakeonlan" in netplan_yaml["network"]["ethernets"]["eth0"] + assert netplan_yaml["network"]["ethernets"]["eth0"]["wakeonlan"] is True diff --git a/tests/integration_tests/bugs/test_gh632.py b/tests/integration_tests/bugs/test_gh632.py index f3702a2e..c7a897c6 100644 --- a/tests/integration_tests/bugs/test_gh632.py +++ b/tests/integration_tests/bugs/test_gh632.py @@ -14,18 +14,20 @@ from tests.integration_tests.util import verify_clean_log @pytest.mark.lxd_vm def test_datasource_rbx_no_stacktrace(client: IntegrationInstance): client.write_to_file( - '/etc/cloud/cloud.cfg.d/90_dpkg.cfg', - 'datasource_list: [ RbxCloud, NoCloud ]\n', + "/etc/cloud/cloud.cfg.d/90_dpkg.cfg", + "datasource_list: [ RbxCloud, NoCloud ]\n", ) client.write_to_file( - '/etc/cloud/ds-identify.cfg', - 'policy: enabled\n', + "/etc/cloud/ds-identify.cfg", + "policy: enabled\n", ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) - assert 'Failed to load metadata and userdata' not in log - assert ("Getting data from <class 'cloudinit.sources.DataSourceRbxCloud." - "DataSourceRbxCloud'> failed") not in log + assert "Failed to load metadata and userdata" not in log + assert ( + "Getting data from <class 'cloudinit.sources.DataSourceRbxCloud." + "DataSourceRbxCloud'> failed" not in log + ) diff --git a/tests/integration_tests/bugs/test_gh668.py b/tests/integration_tests/bugs/test_gh668.py index ce57052e..95edb48d 100644 --- a/tests/integration_tests/bugs/test_gh668.py +++ b/tests/integration_tests/bugs/test_gh668.py @@ -10,7 +10,6 @@ import pytest from tests.integration_tests import random_mac_address from tests.integration_tests.instances import IntegrationInstance - DESTINATION_IP = "172.16.0.10" GATEWAY_IP = "10.0.0.100" MAC_ADDRESS = random_mac_address() @@ -26,17 +25,21 @@ ethernets: via: {} match: macaddress: {} -""".format(DESTINATION_IP, GATEWAY_IP, MAC_ADDRESS) +""".format( + DESTINATION_IP, GATEWAY_IP, MAC_ADDRESS +) EXPECTED_ROUTE = "{} via {}".format(DESTINATION_IP, GATEWAY_IP) @pytest.mark.lxd_container @pytest.mark.lxd_vm -@pytest.mark.lxd_config_dict({ - "user.network-config": NETWORK_CONFIG, - "volatile.eth0.hwaddr": MAC_ADDRESS, -}) +@pytest.mark.lxd_config_dict( + { + "user.network-config": NETWORK_CONFIG, + "volatile.eth0.hwaddr": MAC_ADDRESS, + } +) @pytest.mark.lxd_use_exec def test_static_route_to_host(client: IntegrationInstance): route = client.execute("ip route | grep {}".format(DESTINATION_IP)) diff --git a/tests/integration_tests/bugs/test_gh671.py b/tests/integration_tests/bugs/test_gh671.py index 5e90cdda..15f204ee 100644 --- a/tests/integration_tests/bugs/test_gh671.py +++ b/tests/integration_tests/bugs/test_gh671.py @@ -11,13 +11,13 @@ import pytest from tests.integration_tests.clouds import IntegrationCloud -OLD_PASSWORD = 'DoIM33tTheComplexityRequirements!??' -NEW_PASSWORD = 'DoIM33tTheComplexityRequirementsNow!??' +OLD_PASSWORD = "DoIM33tTheComplexityRequirements!??" +NEW_PASSWORD = "DoIM33tTheComplexityRequirementsNow!??" def _check_password(instance, unhashed_password): - shadow_password = instance.execute('getent shadow ubuntu').split(':')[1] - salt = shadow_password.rsplit('$', 1)[0] + shadow_password = instance.execute("getent shadow ubuntu").split(":")[1] + salt = shadow_password.rsplit("$", 1)[0] hashed_password = crypt.crypt(unhashed_password, salt) assert shadow_password == hashed_password @@ -26,29 +26,28 @@ def _check_password(instance, unhashed_password): @pytest.mark.sru_2020_11 def test_update_default_password(setup_image, session_cloud: IntegrationCloud): os_profile = { - 'os_profile': { - 'admin_password': '', - 'linux_configuration': { - 'disable_password_authentication': False - } + "os_profile": { + "admin_password": "", + "linux_configuration": {"disable_password_authentication": False}, } } - os_profile['os_profile']['admin_password'] = OLD_PASSWORD - instance1 = session_cloud.launch(launch_kwargs={'vm_params': os_profile}) + os_profile["os_profile"]["admin_password"] = OLD_PASSWORD + instance1 = session_cloud.launch(launch_kwargs={"vm_params": os_profile}) _check_password(instance1, OLD_PASSWORD) snapshot_id = instance1.cloud.cloud_instance.snapshot( - instance1.instance, - delete_provisioned_user=False + instance1.instance, delete_provisioned_user=False ) - os_profile['os_profile']['admin_password'] = NEW_PASSWORD + os_profile["os_profile"]["admin_password"] = NEW_PASSWORD try: - with session_cloud.launch(launch_kwargs={ - 'image_id': snapshot_id, - 'vm_params': os_profile, - }) as instance2: + with session_cloud.launch( + launch_kwargs={ + "image_id": snapshot_id, + "vm_params": os_profile, + } + ) as instance2: _check_password(instance2, NEW_PASSWORD) finally: session_cloud.cloud_instance.delete_image(snapshot_id) diff --git a/tests/integration_tests/bugs/test_gh868.py b/tests/integration_tests/bugs/test_gh868.py index 1119d461..a62e8b36 100644 --- a/tests/integration_tests/bugs/test_gh868.py +++ b/tests/integration_tests/bugs/test_gh868.py @@ -4,7 +4,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_clean_log - USERDATA = """\ #cloud-config chef: @@ -24,5 +23,5 @@ chef: @pytest.mark.lxd_vm @pytest.mark.user_data(USERDATA) def test_chef_license(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) diff --git a/tests/integration_tests/bugs/test_lp1813396.py b/tests/integration_tests/bugs/test_lp1813396.py index 27d41c2b..451a9972 100644 --- a/tests/integration_tests/bugs/test_lp1813396.py +++ b/tests/integration_tests/bugs/test_lp1813396.py @@ -8,7 +8,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_ordered_items_in_text - USER_DATA = """\ #cloud-config apt: @@ -23,7 +22,7 @@ apt: @pytest.mark.sru_2020_11 @pytest.mark.user_data(USER_DATA) def test_gpg_no_tty(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") to_verify = [ "Running command ['gpg', '--no-tty', " "'--keyserver=keyserver.ubuntu.com', '--recv-keys', 'E4D304DF'] " diff --git a/tests/integration_tests/bugs/test_lp1835584.py b/tests/integration_tests/bugs/test_lp1835584.py index 732f2179..a800eab4 100644 --- a/tests/integration_tests/bugs/test_lp1835584.py +++ b/tests/integration_tests/bugs/test_lp1835584.py @@ -31,12 +31,9 @@ import re import pytest -from tests.integration_tests.instances import IntegrationAzureInstance -from tests.integration_tests.clouds import ( - ImageSpecification, IntegrationCloud -) +from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud from tests.integration_tests.conftest import get_validated_source - +from tests.integration_tests.instances import IntegrationAzureInstance IMG_AZURE_UBUNTU_PRO_FIPS_BIONIC = ( "Canonical:0001-com-ubuntu-pro-bionic-fips:pro-fips-18_04:18.04.202010201" @@ -44,14 +41,12 @@ IMG_AZURE_UBUNTU_PRO_FIPS_BIONIC = ( def _check_iid_insensitive_across_kernel_upgrade( - instance: IntegrationAzureInstance + instance: IntegrationAzureInstance, ): uuid = instance.read_from_file("/sys/class/dmi/id/product_uuid") - assert uuid.isupper(), ( - "Expected uppercase UUID on Ubuntu FIPS image {}".format( - uuid - ) - ) + assert ( + uuid.isupper() + ), "Expected uppercase UUID on Ubuntu FIPS image {}".format(uuid) orig_kernel = instance.execute("uname -r").strip() assert "azure-fips" in orig_kernel result = instance.execute("apt-get update") @@ -80,7 +75,7 @@ def _check_iid_insensitive_across_kernel_upgrade( @pytest.mark.azure @pytest.mark.sru_next def test_azure_kernel_upgrade_case_insensitive_uuid( - session_cloud: IntegrationCloud + session_cloud: IntegrationCloud, ): cfg_image_spec = ImageSpecification.from_os_image() if (cfg_image_spec.os, cfg_image_spec.release) != ("ubuntu", "bionic"): diff --git a/tests/integration_tests/bugs/test_lp1886531.py b/tests/integration_tests/bugs/test_lp1886531.py index 6dd61222..d56ca320 100644 --- a/tests/integration_tests/bugs/test_lp1886531.py +++ b/tests/integration_tests/bugs/test_lp1886531.py @@ -13,7 +13,6 @@ import pytest from tests.integration_tests.util import verify_clean_log - USER_DATA = """\ #cloud-config bootcmd: @@ -22,7 +21,6 @@ bootcmd: class TestLp1886531: - @pytest.mark.user_data(USER_DATA) def test_lp1886531(self, client): log_content = client.read_from_file("/var/log/cloud-init.log") diff --git a/tests/integration_tests/bugs/test_lp1897099.py b/tests/integration_tests/bugs/test_lp1897099.py index 27c8927f..876a2887 100644 --- a/tests/integration_tests/bugs/test_lp1897099.py +++ b/tests/integration_tests/bugs/test_lp1897099.py @@ -7,7 +7,6 @@ https://bugs.launchpad.net/cloud-init/+bug/1897099 import pytest - USER_DATA = """\ #cloud-config bootcmd: @@ -21,11 +20,11 @@ swap: @pytest.mark.sru_2020_11 @pytest.mark.user_data(USER_DATA) -@pytest.mark.no_container('Containers cannot configure swap') +@pytest.mark.no_container("Containers cannot configure swap") def test_fallocate_fallback(client): - log = client.read_from_file('/var/log/cloud-init.log') - assert '/swap.img' in client.execute('cat /proc/swaps') - assert '/swap.img' in client.execute('cat /etc/fstab') - assert 'fallocate swap creation failed, will attempt with dd' in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "/swap.img" in client.execute("cat /proc/swaps") + assert "/swap.img" in client.execute("cat /etc/fstab") + assert "fallocate swap creation failed, will attempt with dd" in log assert "Running command ['dd', 'if=/dev/zero', 'of=/swap.img'" in log - assert 'SUCCESS: config-mounts ran successfully' in log + assert "SUCCESS: config-mounts ran successfully" in log diff --git a/tests/integration_tests/bugs/test_lp1898997.py b/tests/integration_tests/bugs/test_lp1898997.py index 909bc690..115bd34f 100644 --- a/tests/integration_tests/bugs/test_lp1898997.py +++ b/tests/integration_tests/bugs/test_lp1898997.py @@ -33,13 +33,17 @@ ethernets: match: macaddress: {} version: 2 -""".format(MAC_ADDRESS) +""".format( + MAC_ADDRESS +) -@pytest.mark.lxd_config_dict({ - "user.network-config": NETWORK_CONFIG, - "volatile.eth0.hwaddr": MAC_ADDRESS, -}) +@pytest.mark.lxd_config_dict( + { + "user.network-config": NETWORK_CONFIG, + "volatile.eth0.hwaddr": MAC_ADDRESS, + } +) @pytest.mark.lxd_vm @pytest.mark.lxd_use_exec @pytest.mark.not_bionic diff --git a/tests/integration_tests/bugs/test_lp1900837.py b/tests/integration_tests/bugs/test_lp1900837.py index fcc2b751..3df10883 100644 --- a/tests/integration_tests/bugs/test_lp1900837.py +++ b/tests/integration_tests/bugs/test_lp1900837.py @@ -23,7 +23,7 @@ class TestLogPermissionsNotResetOnReboot: # Reboot client.restart() - assert client.execute('cloud-init status').ok + assert client.execute("cloud-init status").ok # Check that permissions are not reset on reboot assert "600" == _get_log_perms(client) diff --git a/tests/integration_tests/bugs/test_lp1901011.py b/tests/integration_tests/bugs/test_lp1901011.py index 2b47f0a8..7de8bd77 100644 --- a/tests/integration_tests/bugs/test_lp1901011.py +++ b/tests/integration_tests/bugs/test_lp1901011.py @@ -10,12 +10,16 @@ from tests.integration_tests.clouds import IntegrationCloud @pytest.mark.azure -@pytest.mark.parametrize('instance_type,is_ephemeral', [ - ('Standard_DS1_v2', True), - ('Standard_D2s_v4', False), -]) -def test_ephemeral(instance_type, is_ephemeral, - session_cloud: IntegrationCloud, setup_image): +@pytest.mark.parametrize( + "instance_type,is_ephemeral", + [ + ("Standard_DS1_v2", True), + ("Standard_D2s_v4", False), + ], +) +def test_ephemeral( + instance_type, is_ephemeral, session_cloud: IntegrationCloud, setup_image +): if is_ephemeral: expected_log = ( "Ephemeral resource disk '/dev/disk/cloud/azure_resource' exists. " @@ -29,30 +33,35 @@ def test_ephemeral(instance_type, is_ephemeral, ) with session_cloud.launch( - launch_kwargs={'instance_type': instance_type} + launch_kwargs={"instance_type": instance_type} ) as client: # Verify log file - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert expected_log in log # Verify devices - dev_links = client.execute('ls /dev/disk/cloud') - assert 'azure_root' in dev_links - assert 'azure_root-part1' in dev_links + dev_links = client.execute("ls /dev/disk/cloud") + assert "azure_root" in dev_links + assert "azure_root-part1" in dev_links if is_ephemeral: - assert 'azure_resource' in dev_links - assert 'azure_resource-part1' in dev_links + assert "azure_resource" in dev_links + assert "azure_resource-part1" in dev_links # Verify mounts - blks = client.execute('lsblk -pPo NAME,TYPE,MOUNTPOINT') + blks = client.execute("lsblk -pPo NAME,TYPE,MOUNTPOINT") root_device = client.execute( - 'realpath /dev/disk/cloud/azure_root-part1' + "realpath /dev/disk/cloud/azure_root-part1" + ) + assert ( + 'NAME="{}" TYPE="part" MOUNTPOINT="/"'.format(root_device) in blks ) - assert 'NAME="{}" TYPE="part" MOUNTPOINT="/"'.format( - root_device) in blks if is_ephemeral: ephemeral_device = client.execute( - 'realpath /dev/disk/cloud/azure_resource-part1' + "realpath /dev/disk/cloud/azure_resource-part1" + ) + assert ( + 'NAME="{}" TYPE="part" MOUNTPOINT="/mnt"'.format( + ephemeral_device + ) + in blks ) - assert 'NAME="{}" TYPE="part" MOUNTPOINT="/mnt"'.format( - ephemeral_device) in blks diff --git a/tests/integration_tests/bugs/test_lp1910835.py b/tests/integration_tests/bugs/test_lp1910835.py index 87f92d5e..ddd996f9 100644 --- a/tests/integration_tests/bugs/test_lp1910835.py +++ b/tests/integration_tests/bugs/test_lp1910835.py @@ -19,7 +19,6 @@ will match. """ import pytest - USER_DATA_TMPL = """\ #cloud-config ssh_authorized_keys: diff --git a/tests/integration_tests/bugs/test_lp1912844.py b/tests/integration_tests/bugs/test_lp1912844.py index efafae50..55511ed2 100644 --- a/tests/integration_tests/bugs/test_lp1912844.py +++ b/tests/integration_tests/bugs/test_lp1912844.py @@ -51,7 +51,9 @@ vlans: id: 200 link: ovs-br mtu: 1500 -""".format(MAC_ADDRESS) +""".format( + MAC_ADDRESS +) SETUP_USER_DATA = """\ diff --git a/tests/integration_tests/clouds.py b/tests/integration_tests/clouds.py index dee2adff..d4670bac 100644 --- a/tests/integration_tests/clouds.py +++ b/tests/integration_tests/clouds.py @@ -1,17 +1,17 @@ # This file is part of cloud-init. See LICENSE file for license information. -from abc import ABC, abstractmethod import datetime import logging import os.path import random import string +from abc import ABC, abstractmethod from uuid import UUID from pycloudlib import ( EC2, GCE, - Azure, OCI, + Azure, LXDContainer, LXDVirtualMachine, Openstack, @@ -19,14 +19,15 @@ from pycloudlib import ( from pycloudlib.lxd.instance import LXDInstance import cloudinit -from cloudinit.subp import subp, ProcessExecutionError +from cloudinit.subp import ProcessExecutionError, subp from tests.integration_tests import integration_settings from tests.integration_tests.instances import ( + IntegrationAzureInstance, IntegrationEc2Instance, IntegrationGceInstance, - IntegrationAzureInstance, IntegrationInstance, - IntegrationOciInstance, + IntegrationInstance, IntegrationLxdInstance, + IntegrationOciInstance, ) from tests.integration_tests.util import emit_dots_on_travis @@ -36,7 +37,7 @@ except ImportError: pass -log = logging.getLogger('integration_testing') +log = logging.getLogger("integration_testing") def _get_ubuntu_series() -> list: @@ -149,41 +150,48 @@ class IntegrationCloud(ABC): pycloudlib_instance = self.cloud_instance.launch(**launch_kwargs) return pycloudlib_instance - def launch(self, user_data=None, launch_kwargs=None, - settings=integration_settings, **kwargs): + def launch( + self, + user_data=None, + launch_kwargs=None, + settings=integration_settings, + **kwargs + ): if launch_kwargs is None: launch_kwargs = {} if self.settings.EXISTING_INSTANCE_ID: log.info( - 'Not launching instance due to EXISTING_INSTANCE_ID. ' - 'Instance id: %s', self.settings.EXISTING_INSTANCE_ID) + "Not launching instance due to EXISTING_INSTANCE_ID. " + "Instance id: %s", + self.settings.EXISTING_INSTANCE_ID, + ) self.instance = self.cloud_instance.get_instance( self.settings.EXISTING_INSTANCE_ID ) return self.instance default_launch_kwargs = { - 'image_id': self.image_id, - 'user_data': user_data, + "image_id": self.image_id, + "user_data": user_data, } launch_kwargs = {**default_launch_kwargs, **launch_kwargs} log.info( "Launching instance with launch_kwargs:\n%s", - "\n".join("{}={}".format(*item) for item in launch_kwargs.items()) + "\n".join("{}={}".format(*item) for item in launch_kwargs.items()), ) with emit_dots_on_travis(): pycloudlib_instance = self._perform_launch(launch_kwargs, **kwargs) - log.info('Launched instance: %s', pycloudlib_instance) + log.info("Launched instance: %s", pycloudlib_instance) instance = self.get_instance(pycloudlib_instance, settings) - if launch_kwargs.get('wait', True): + if launch_kwargs.get("wait", True): # If we aren't waiting, we can't rely on command execution here log.info( - 'cloud-init version: %s', - instance.execute("cloud-init --version") + "cloud-init version: %s", + instance.execute("cloud-init --version"), ) serial = instance.execute("grep serial /etc/cloud/build.info") if serial: - log.info('image serial: %s', serial.split()[1]) + log.info("image serial: %s", serial.split()[1]) return instance def get_instance(self, cloud_instance, settings=integration_settings): @@ -199,66 +207,70 @@ class IntegrationCloud(ABC): if self.snapshot_id: if self.settings.KEEP_IMAGE: log.info( - 'NOT deleting snapshot image created for this testrun ' - 'because KEEP_IMAGE is True: %s', self.snapshot_id) + "NOT deleting snapshot image created for this testrun " + "because KEEP_IMAGE is True: %s", + self.snapshot_id, + ) else: log.info( - 'Deleting snapshot image created for this testrun: %s', - self.snapshot_id + "Deleting snapshot image created for this testrun: %s", + self.snapshot_id, ) self.cloud_instance.delete_image(self.snapshot_id) class Ec2Cloud(IntegrationCloud): - datasource = 'ec2' + datasource = "ec2" integration_instance_cls = IntegrationEc2Instance def _get_cloud_instance(self): - return EC2(tag='ec2-integration-test') + return EC2(tag="ec2-integration-test") class GceCloud(IntegrationCloud): - datasource = 'gce' + datasource = "gce" integration_instance_cls = IntegrationGceInstance def _get_cloud_instance(self): return GCE( - tag='gce-integration-test', + tag="gce-integration-test", ) class AzureCloud(IntegrationCloud): - datasource = 'azure' + datasource = "azure" integration_instance_cls = IntegrationAzureInstance def _get_cloud_instance(self): - return Azure(tag='azure-integration-test') + return Azure(tag="azure-integration-test") def destroy(self): if self.settings.KEEP_INSTANCE: log.info( - 'NOT deleting resource group because KEEP_INSTANCE is true ' - 'and deleting resource group would also delete instance. ' - 'Instance and resource group must both be manually deleted.' + "NOT deleting resource group because KEEP_INSTANCE is true " + "and deleting resource group would also delete instance. " + "Instance and resource group must both be manually deleted." ) else: self.cloud_instance.delete_resource_group() class OciCloud(IntegrationCloud): - datasource = 'oci' + datasource = "oci" integration_instance_cls = IntegrationOciInstance def _get_cloud_instance(self): if not integration_settings.ORACLE_AVAILABILITY_DOMAIN: raise Exception( - 'ORACLE_AVAILABILITY_DOMAIN must be set to a valid ' - 'availability domain. If using the oracle CLI, ' - 'try `oci iam availability-domain list`' + "ORACLE_AVAILABILITY_DOMAIN must be set to a valid " + "availability domain. If using the oracle CLI, " + "try `oci iam availability-domain list`" ) return OCI( - tag='oci-integration-test', - availability_domain=integration_settings.ORACLE_AVAILABILITY_DOMAIN + tag="oci-integration-test", + availability_domain=( + integration_settings.ORACLE_AVAILABILITY_DOMAIN, + ), ) @@ -277,38 +289,42 @@ class _LxdIntegrationCloud(IntegrationCloud): def _mount_source(instance: LXDInstance): cloudinit_path = cloudinit.__path__[0] mounts = [ - (cloudinit_path, '/usr/lib/python3/dist-packages/cloudinit'), - (os.path.join(cloudinit_path, '..', 'config', 'cloud.cfg.d'), - '/etc/cloud/cloud.cfg.d'), - (os.path.join(cloudinit_path, '..', 'templates'), - '/etc/cloud/templates'), + (cloudinit_path, "/usr/lib/python3/dist-packages/cloudinit"), + ( + os.path.join(cloudinit_path, "..", "config", "cloud.cfg.d"), + "/etc/cloud/cloud.cfg.d", + ), + ( + os.path.join(cloudinit_path, "..", "templates"), + "/etc/cloud/templates", + ), ] for (n, (source_path, target_path)) in enumerate(mounts): format_variables = { - 'name': instance.name, - 'source_path': os.path.realpath(source_path), - 'container_path': target_path, - 'idx': n, + "name": instance.name, + "source_path": os.path.realpath(source_path), + "container_path": target_path, + "idx": n, } log.info( - 'Mounting source %(source_path)s directly onto LXD' - ' container/VM named %(name)s at %(container_path)s', - format_variables + "Mounting source %(source_path)s directly onto LXD" + " container/VM named %(name)s at %(container_path)s", + format_variables, ) command = ( - 'lxc config device add {name} host-cloud-init-{idx} disk ' - 'source={source_path} ' - 'path={container_path}' + "lxc config device add {name} host-cloud-init-{idx} disk " + "source={source_path} " + "path={container_path}" ).format(**format_variables) subp(command.split()) def _perform_launch(self, launch_kwargs, **kwargs): - launch_kwargs['inst_type'] = launch_kwargs.pop('instance_type', None) - wait = launch_kwargs.pop('wait', True) - release = launch_kwargs.pop('image_id') + launch_kwargs["inst_type"] = launch_kwargs.pop("instance_type", None) + wait = launch_kwargs.pop("wait", True) + release = launch_kwargs.pop("image_id") try: - profile_list = launch_kwargs['profile_list'] + profile_list = launch_kwargs["profile_list"] except KeyError: profile_list = self._get_or_set_profile_list(release) @@ -317,52 +333,53 @@ class _LxdIntegrationCloud(IntegrationCloud): random.choices(string.ascii_lowercase + string.digits, k=8) ) pycloudlib_instance = self.cloud_instance.init( - launch_kwargs.pop('name', default_name), + launch_kwargs.pop("name", default_name), release, profile_list=profile_list, **launch_kwargs ) - if self.settings.CLOUD_INIT_SOURCE == 'IN_PLACE': + if self.settings.CLOUD_INIT_SOURCE == "IN_PLACE": self._mount_source(pycloudlib_instance) - if 'lxd_setup' in kwargs: + if "lxd_setup" in kwargs: log.info("Running callback specified by 'lxd_setup' mark") - kwargs['lxd_setup'](pycloudlib_instance) + kwargs["lxd_setup"](pycloudlib_instance) pycloudlib_instance.start(wait=wait) return pycloudlib_instance class LxdContainerCloud(_LxdIntegrationCloud): - datasource = 'lxd_container' + datasource = "lxd_container" pycloudlib_instance_cls = LXDContainer - instance_tag = 'lxd-container-integration-test' + instance_tag = "lxd-container-integration-test" class LxdVmCloud(_LxdIntegrationCloud): - datasource = 'lxd_vm' + datasource = "lxd_vm" pycloudlib_instance_cls = LXDVirtualMachine - instance_tag = 'lxd-vm-integration-test' + instance_tag = "lxd-vm-integration-test" _profile_list = None def _get_or_set_profile_list(self, release): if self._profile_list: return self._profile_list self._profile_list = self.cloud_instance.build_necessary_profiles( - release) + release + ) return self._profile_list class OpenstackCloud(IntegrationCloud): - datasource = 'openstack' + datasource = "openstack" integration_instance_cls = IntegrationInstance def _get_cloud_instance(self): if not integration_settings.OPENSTACK_NETWORK: raise Exception( - 'OPENSTACK_NETWORK must be set to a valid Openstack network. ' - 'If using the openstack CLI, try `openstack network list`' + "OPENSTACK_NETWORK must be set to a valid Openstack network. " + "If using the openstack CLI, try `openstack network list`" ) return Openstack( - tag='openstack-integration-test', + tag="openstack-integration-test", network=integration_settings.OPENSTACK_NETWORK, ) @@ -372,9 +389,9 @@ class OpenstackCloud(IntegrationCloud): UUID(image.image_id) except ValueError as e: raise Exception( - 'When using Openstack, `OS_IMAGE` MUST be specified with ' - 'a 36-character UUID image ID. Passing in a release name is ' - 'not valid here.\n' - 'OS image id: {}'.format(image.image_id) + "When using Openstack, `OS_IMAGE` MUST be specified with " + "a 36-character UUID image ID. Passing in a release name is " + "not valid here.\n" + "OS image id: {}".format(image.image_id) ) from e return image.image_id diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 5eab5a45..b14b6ad0 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -2,12 +2,13 @@ import datetime import functools import logging -import pytest import os import sys -from tarfile import TarFile from contextlib import contextmanager from pathlib import Path +from tarfile import TarFile + +import pytest from tests.integration_tests import integration_settings from tests.integration_tests.clouds import ( @@ -19,31 +20,30 @@ from tests.integration_tests.clouds import ( LxdContainerCloud, LxdVmCloud, OciCloud, - _LxdIntegrationCloud, OpenstackCloud, + _LxdIntegrationCloud, ) from tests.integration_tests.instances import ( CloudInitSource, IntegrationInstance, ) - -log = logging.getLogger('integration_testing') +log = logging.getLogger("integration_testing") log.addHandler(logging.StreamHandler(sys.stdout)) log.setLevel(logging.INFO) platforms = { - 'ec2': Ec2Cloud, - 'gce': GceCloud, - 'azure': AzureCloud, - 'oci': OciCloud, - 'lxd_container': LxdContainerCloud, - 'lxd_vm': LxdVmCloud, - 'openstack': OpenstackCloud, + "ec2": Ec2Cloud, + "gce": GceCloud, + "azure": AzureCloud, + "oci": OciCloud, + "lxd_container": LxdContainerCloud, + "lxd_vm": LxdVmCloud, + "openstack": OpenstackCloud, } os_list = ["ubuntu"] -session_start_time = datetime.datetime.now().strftime('%y%m%d%H%M%S') +session_start_time = datetime.datetime.now().strftime("%y%m%d%H%M%S") XENIAL_LXD_VM_EXEC_MSG = """\ The default xenial images do not support `exec` for LXD VMs. @@ -69,14 +69,14 @@ def pytest_runtest_setup(item): test_marks = [mark.name for mark in item.iter_markers()] supported_platforms = set(all_platforms).intersection(test_marks) current_platform = integration_settings.PLATFORM - unsupported_message = 'Cannot run on platform {}'.format(current_platform) - if 'no_container' in test_marks: - if 'lxd_container' in test_marks: + unsupported_message = "Cannot run on platform {}".format(current_platform) + if "no_container" in test_marks: + if "lxd_container" in test_marks: raise Exception( - 'lxd_container and no_container marks simultaneously set ' - 'on test' + "lxd_container and no_container marks simultaneously set " + "on test" ) - if current_platform == 'lxd_container': + if current_platform == "lxd_container": pytest.skip(unsupported_message) if supported_platforms and current_platform not in supported_platforms: pytest.skip(unsupported_message) @@ -86,8 +86,8 @@ def pytest_runtest_setup(item): supported_os_set = set(os_list).intersection(test_marks) if current_os and supported_os_set and current_os not in supported_os_set: pytest.skip("Cannot run on OS {}".format(current_os)) - if 'unstable' in test_marks and not integration_settings.RUN_UNSTABLE: - pytest.skip('Test marked unstable. Manually remove mark to run it') + if "unstable" in test_marks and not integration_settings.RUN_UNSTABLE: + pytest.skip("Test marked unstable. Manually remove mark to run it") current_release = image.release if "not_{}".format(current_release) in test_marks: @@ -101,7 +101,7 @@ def disable_subp_usage(request): pass -@pytest.yield_fixture(scope='session') +@pytest.yield_fixture(scope="session") def session_cloud(): if integration_settings.PLATFORM not in platforms.keys(): raise ValueError( @@ -122,28 +122,30 @@ def session_cloud(): def get_validated_source( session_cloud: IntegrationCloud, - source=integration_settings.CLOUD_INIT_SOURCE + source=integration_settings.CLOUD_INIT_SOURCE, ) -> CloudInitSource: - if source == 'NONE': + if source == "NONE": return CloudInitSource.NONE - elif source == 'IN_PLACE': - if session_cloud.datasource not in ['lxd_container', 'lxd_vm']: + elif source == "IN_PLACE": + if session_cloud.datasource not in ["lxd_container", "lxd_vm"]: raise ValueError( - 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD') + "IN_PLACE as CLOUD_INIT_SOURCE only works for LXD" + ) return CloudInitSource.IN_PLACE - elif source == 'PROPOSED': + elif source == "PROPOSED": return CloudInitSource.PROPOSED - elif source.startswith('ppa:'): + elif source.startswith("ppa:"): return CloudInitSource.PPA elif os.path.isfile(str(source)): return CloudInitSource.DEB_PACKAGE elif source == "UPGRADE": return CloudInitSource.UPGRADE raise ValueError( - 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(source)) + "Invalid value for CLOUD_INIT_SOURCE setting: {}".format(source) + ) -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def setup_image(session_cloud: IntegrationCloud): """Setup the target environment with the correct version of cloud-init. @@ -153,17 +155,18 @@ def setup_image(session_cloud: IntegrationCloud): source = get_validated_source(session_cloud) if not source.installs_new_version(): return - log.info('Setting up environment for %s', session_cloud.datasource) + log.info("Setting up environment for %s", session_cloud.datasource) client = session_cloud.launch() client.install_new_cloud_init(source) # Even if we're keeping instances, we don't want to keep this # one around as it was just for image creation client.destroy() - log.info('Done with environment setup') + log.info("Done with environment setup") -def _collect_logs(instance: IntegrationInstance, node_id: str, - test_failed: bool): +def _collect_logs( + instance: IntegrationInstance, node_id: str, test_failed: bool +): """Collect logs from remote instance. Args: @@ -172,36 +175,43 @@ def _collect_logs(instance: IntegrationInstance, node_id: str, tests/integration_tests/test_example.py::TestExample.test_example test_failed: If test failed or not """ - if any([ - integration_settings.COLLECT_LOGS == 'NEVER', - integration_settings.COLLECT_LOGS == 'ON_ERROR' and not test_failed - ]): + if any( + [ + integration_settings.COLLECT_LOGS == "NEVER", + integration_settings.COLLECT_LOGS == "ON_ERROR" + and not test_failed, + ] + ): return instance.execute( - 'cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz') + "cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz" + ) node_id_path = Path( - node_id - .replace('.py', '') # Having a directory with '.py' would be weird - .replace('::', os.path.sep) # Turn classes/tests into paths - .replace('[', '-') # For parametrized names - .replace(']', '') # For parameterized names + node_id.replace( + ".py", "" + ) # Having a directory with '.py' would be weird + .replace("::", os.path.sep) # Turn classes/tests into paths + .replace("[", "-") # For parametrized names + .replace("]", "") # For parameterized names + ) + log_dir = ( + Path(integration_settings.LOCAL_LOG_PATH) + / session_start_time + / node_id_path ) - log_dir = Path( - integration_settings.LOCAL_LOG_PATH - ) / session_start_time / node_id_path log.info("Writing logs to %s", log_dir) if not log_dir.exists(): log_dir.mkdir(parents=True) # Add a symlink to the latest log output directory - last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / 'last' + last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / "last" if os.path.islink(last_symlink): os.unlink(last_symlink) os.symlink(log_dir.parent, last_symlink) - tarball_path = log_dir / 'cloud-init.tar.gz' - instance.pull_file('/var/tmp/cloud-init.tar.gz', tarball_path) + tarball_path = log_dir / "cloud-init.tar.gz" + instance.pull_file("/var/tmp/cloud-init.tar.gz", tarball_path) tarball = TarFile.open(str(tarball_path)) tarball.extractall(path=str(log_dir)) @@ -218,12 +228,12 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud): getter = functools.partial( fixture_utils.closest_marker_first_arg_or, request, default=None ) - user_data = getter('user_data') - name = getter('instance_name') - lxd_config_dict = getter('lxd_config_dict') - lxd_setup = getter('lxd_setup') + user_data = getter("user_data") + name = getter("instance_name") + lxd_config_dict = getter("lxd_config_dict") + lxd_setup = getter("lxd_setup") lxd_use_exec = fixture_utils.closest_marker_args_or( - request, 'lxd_use_exec', None + request, "lxd_use_exec", None ) launch_kwargs = {} @@ -250,8 +260,8 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud): local_launch_kwargs = {} if lxd_setup is not None: if not isinstance(session_cloud, _LxdIntegrationCloud): - pytest.skip('lxd_setup requires LXD') - local_launch_kwargs['lxd_setup'] = lxd_setup + pytest.skip("lxd_setup requires LXD") + local_launch_kwargs["lxd_setup"] = lxd_setup with session_cloud.launch( user_data=user_data, launch_kwargs=launch_kwargs, **local_launch_kwargs @@ -273,14 +283,14 @@ def client(request, fixture_utils, session_cloud, setup_image): yield client -@pytest.yield_fixture(scope='module') +@pytest.yield_fixture(scope="module") def module_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per module.""" with _client(request, fixture_utils, session_cloud) as client: yield client -@pytest.yield_fixture(scope='class') +@pytest.yield_fixture(scope="class") def class_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per class.""" with _client(request, fixture_utils, session_cloud) as client: diff --git a/tests/integration_tests/datasources/test_lxd_discovery.py b/tests/integration_tests/datasources/test_lxd_discovery.py index 3f05e906..da010813 100644 --- a/tests/integration_tests/datasources/test_lxd_discovery.py +++ b/tests/integration_tests/datasources/test_lxd_discovery.py @@ -1,4 +1,5 @@ import json + import pytest import yaml @@ -9,10 +10,10 @@ from tests.integration_tests.util import verify_clean_log def _customize_envionment(client: IntegrationInstance): client.write_to_file( - '/etc/cloud/cloud.cfg.d/99-detect-lxd.cfg', - 'datasource_list: [LXD]\n', + "/etc/cloud/cloud.cfg.d/99-detect-lxd.cfg", + "datasource_list: [LXD]\n", ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() @@ -25,40 +26,44 @@ def test_lxd_datasource_discovery(client: IntegrationInstance): """Test that DataSourceLXD is detected instead of NoCloud.""" _customize_envionment(client) nic_dev = "enp5s0" if client.settings.PLATFORM == "lxd_vm" else "eth0" - result = client.execute('cloud-init status --long') + result = client.execute("cloud-init status --long") if not result.ok: - raise AssertionError('cloud-init failed:\n%s', result.stderr) + raise AssertionError("cloud-init failed:\n%s", result.stderr) if "DataSourceLXD" not in result.stdout: raise AssertionError( - 'cloud-init did not discover DataSourceLXD', result.stdout + "cloud-init did not discover DataSourceLXD", result.stdout ) - netplan_yaml = client.execute('cat /etc/netplan/50-cloud-init.yaml') + netplan_yaml = client.execute("cat /etc/netplan/50-cloud-init.yaml") netplan_cfg = yaml.safe_load(netplan_yaml) assert { - 'network': {'ethernets': {nic_dev: {'dhcp4': True}}, 'version': 2} + "network": {"ethernets": {nic_dev: {"dhcp4": True}}, "version": 2} } == netplan_cfg - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) - result = client.execute('cloud-id') + result = client.execute("cloud-id") if result.stdout != "lxd": raise AssertionError( "cloud-id didn't report lxd. Result: %s", result.stdout ) # Validate config instance data represented - data = json.loads(client.read_from_file( - '/run/cloud-init/instance-data.json') + data = json.loads( + client.read_from_file("/run/cloud-init/instance-data.json") ) v1 = data["v1"] ds_cfg = data["ds"] assert "lxd" == v1["platform"] assert "LXD socket API v. 1.0 (/dev/lxd/sock)" == v1["subplatform"] - ds_cfg = json.loads(client.execute('cloud-init query ds').stdout) + ds_cfg = json.loads(client.execute("cloud-init query ds").stdout) assert ["_doc", "_metadata_api_version", "config", "meta-data"] == sorted( list(ds_cfg.keys()) ) if ( - client.settings.PLATFORM == "lxd_vm" and - ImageSpecification.from_os_image().release in ("xenial", "bionic") + client.settings.PLATFORM == "lxd_vm" + and ImageSpecification.from_os_image().release + in ( + "xenial", + "bionic", + ) ): # pycloudlib injects user.vendor_data for lxd_vm on bionic and xenial # to start the lxd-agent. @@ -74,17 +79,13 @@ def test_lxd_datasource_discovery(client: IntegrationInstance): assert {"public-keys": v1["public_ssh_keys"][0]} == ( yaml.safe_load(ds_cfg["config"]["user.meta-data"]) ) - assert ( - "#cloud-config\ninstance-id" in ds_cfg["meta-data"] - ) + assert "#cloud-config\ninstance-id" in ds_cfg["meta-data"] # Assert NoCloud seed data is still present in cloud image metadata # This will start failing if we redact metadata templates from # https://cloud-images.ubuntu.com/daily/server/jammy/current/\ # jammy-server-cloudimg-amd64-lxd.tar.xz nocloud_metadata = yaml.safe_load( - client.read_from_file( - "/var/lib/cloud/seed/nocloud-net/meta-data" - ) + client.read_from_file("/var/lib/cloud/seed/nocloud-net/meta-data") ) assert client.instance.name == nocloud_metadata["instance-id"] assert ( diff --git a/tests/integration_tests/datasources/test_network_dependency.py b/tests/integration_tests/datasources/test_network_dependency.py index 24e71f9d..32ac7053 100644 --- a/tests/integration_tests/datasources/test_network_dependency.py +++ b/tests/integration_tests/datasources/test_network_dependency.py @@ -6,10 +6,10 @@ from tests.integration_tests.instances import IntegrationInstance def _customize_envionment(client: IntegrationInstance): # Insert our "disable_network_activation" file here client.write_to_file( - '/etc/cloud/cloud.cfg.d/99-disable-network-activation.cfg', - 'disable_network_activation: true\n', + "/etc/cloud/cloud.cfg.d/99-disable-network-activation.cfg", + "disable_network_activation: true\n", ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() @@ -20,13 +20,14 @@ def _customize_envionment(client: IntegrationInstance): def test_network_activation_disabled(client: IntegrationInstance): """Test that the network is not activated during init mode.""" _customize_envionment(client) - result = client.execute('systemctl status google-guest-agent.service') + result = client.execute("systemctl status google-guest-agent.service") if not result.ok: raise AssertionError( - 'google-guest-agent is not active:\n%s', result.stdout) - log = client.read_from_file('/var/log/cloud-init.log') + "google-guest-agent is not active:\n%s", result.stdout + ) + log = client.read_from_file("/var/log/cloud-init.log") assert "Running command ['netplan', 'apply']" not in log - assert 'Not bringing up newly configured network interfaces' in log - assert 'Bringing up newly configured network interfaces' not in log + assert "Not bringing up newly configured network interfaces" in log + assert "Bringing up newly configured network interfaces" not in log diff --git a/tests/integration_tests/instances.py b/tests/integration_tests/instances.py index 8f66bf43..793f729e 100644 --- a/tests/integration_tests/instances.py +++ b/tests/integration_tests/instances.py @@ -1,8 +1,8 @@ # This file is part of cloud-init. See LICENSE file for license information. -from enum import Enum import logging import os import uuid +from enum import Enum from tempfile import NamedTemporaryFile from pycloudlib.instance import BaseInstance @@ -13,20 +13,21 @@ from tests.integration_tests.util import retry try: from typing import TYPE_CHECKING + if TYPE_CHECKING: from tests.integration_tests.clouds import ( # noqa: F401 - IntegrationCloud + IntegrationCloud, ) except ImportError: pass -log = logging.getLogger('integration_testing') +log = logging.getLogger("integration_testing") def _get_tmp_path(): tmp_filename = str(uuid.uuid4()) - return '/var/tmp/{}.tmp'.format(tmp_filename) + return "/var/tmp/{}.tmp".format(tmp_filename) class CloudInitSource(Enum): @@ -37,6 +38,7 @@ class CloudInitSource(Enum): explanation of these values. If the value set there can't be parsed into one of these values, an exception will be raised """ + NONE = 1 IN_PLACE = 2 PROPOSED = 3 @@ -51,8 +53,12 @@ class CloudInitSource(Enum): class IntegrationInstance: - def __init__(self, cloud: 'IntegrationCloud', instance: BaseInstance, - settings=integration_settings): + def __init__( + self, + cloud: "IntegrationCloud", + instance: BaseInstance, + settings=integration_settings, + ): self.cloud = cloud self.instance = instance self.settings = settings @@ -69,41 +75,44 @@ class IntegrationInstance: self.instance.restart() def execute(self, command, *, use_sudo=True) -> Result: - if self.instance.username == 'root' and use_sudo is False: - raise Exception('Root user cannot run unprivileged') + if self.instance.username == "root" and use_sudo is False: + raise Exception("Root user cannot run unprivileged") return self.instance.execute(command, use_sudo=use_sudo) def pull_file(self, remote_path, local_path): # First copy to a temporary directory because of permissions issues tmp_path = _get_tmp_path() - self.instance.execute('cp {} {}'.format(str(remote_path), tmp_path)) + self.instance.execute("cp {} {}".format(str(remote_path), tmp_path)) self.instance.pull_file(tmp_path, str(local_path)) def push_file(self, local_path, remote_path): # First push to a temporary directory because of permissions issues tmp_path = _get_tmp_path() self.instance.push_file(str(local_path), tmp_path) - self.execute('mv {} {}'.format(tmp_path, str(remote_path))) + self.execute("mv {} {}".format(tmp_path, str(remote_path))) def read_from_file(self, remote_path) -> str: - result = self.execute('cat {}'.format(remote_path)) + result = self.execute("cat {}".format(remote_path)) if result.failed: # TODO: Raise here whatever pycloudlib raises when it has # a consistent error response raise IOError( - 'Failed reading remote file via cat: {}\n' - 'Return code: {}\n' - 'Stderr: {}\n' - 'Stdout: {}'.format( - remote_path, result.return_code, - result.stderr, result.stdout) + "Failed reading remote file via cat: {}\n" + "Return code: {}\n" + "Stderr: {}\n" + "Stdout: {}".format( + remote_path, + result.return_code, + result.stderr, + result.stdout, + ) ) return result.stdout def write_to_file(self, remote_path, contents: str): # Writes file locally and then pushes it rather # than writing the file directly on the instance - with NamedTemporaryFile('w', delete=False) as tmp_file: + with NamedTemporaryFile("w", delete=False) as tmp_file: tmp_file.write(contents) try: @@ -113,7 +122,7 @@ class IntegrationInstance: def snapshot(self): image_id = self.cloud.snapshot(self.instance) - log.info('Created new image: %s', image_id) + log.info("Created new image: %s", image_id) return image_id def install_new_cloud_init( @@ -133,10 +142,11 @@ class IntegrationInstance: else: raise Exception( "Specified to install {} which isn't supported here".format( - source) + source + ) ) - version = self.execute('cloud-init -v').split()[-1] - log.info('Installed cloud-init version: %s', version) + version = self.execute("cloud-init -v").split()[-1] + log.info("Installed cloud-init version: %s", version) if clean: self.instance.clean() if take_snapshot: @@ -149,38 +159,39 @@ class IntegrationInstance: @retry(tries=30, delay=1) def install_proposed_image(self): - log.info('Installing proposed image') + log.info("Installing proposed image") assert self.execute( 'echo deb "http://archive.ubuntu.com/ubuntu ' '$(lsb_release -sc)-proposed main" >> ' - '/etc/apt/sources.list.d/proposed.list' + "/etc/apt/sources.list.d/proposed.list" ).ok - assert self.execute('apt-get update -q').ok - assert self.execute('apt-get install -qy cloud-init').ok + assert self.execute("apt-get update -q").ok + assert self.execute("apt-get install -qy cloud-init").ok @retry(tries=30, delay=1) def install_ppa(self): - log.info('Installing PPA') - assert self.execute('add-apt-repository {} -y'.format( - self.settings.CLOUD_INIT_SOURCE) + log.info("Installing PPA") + assert self.execute( + "add-apt-repository {} -y".format(self.settings.CLOUD_INIT_SOURCE) ).ok - assert self.execute('apt-get update -q').ok - assert self.execute('apt-get install -qy cloud-init').ok + assert self.execute("apt-get update -q").ok + assert self.execute("apt-get install -qy cloud-init").ok @retry(tries=30, delay=1) def install_deb(self): - log.info('Installing deb package') + log.info("Installing deb package") deb_path = integration_settings.CLOUD_INIT_SOURCE deb_name = os.path.basename(deb_path) - remote_path = '/var/tmp/{}'.format(deb_name) + remote_path = "/var/tmp/{}".format(deb_name) self.push_file( local_path=integration_settings.CLOUD_INIT_SOURCE, - remote_path=remote_path) - assert self.execute('dpkg -i {path}'.format(path=remote_path)).ok + remote_path=remote_path, + ) + assert self.execute("dpkg -i {path}".format(path=remote_path)).ok @retry(tries=30, delay=1) def upgrade_cloud_init(self): - log.info('Upgrading cloud-init to latest version in archive') + log.info("Upgrading cloud-init to latest version in archive") assert self.execute("apt-get update -q").ok assert self.execute("apt-get install -qy cloud-init").ok diff --git a/tests/integration_tests/integration_settings.py b/tests/integration_tests/integration_settings.py index e4a790c2..641ce297 100644 --- a/tests/integration_tests/integration_settings.py +++ b/tests/integration_tests/integration_settings.py @@ -1,6 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. import os - from distutils.util import strtobool ################################################################## @@ -22,7 +21,7 @@ RUN_UNSTABLE = False # gce # oci # openstack -PLATFORM = 'lxd_container' +PLATFORM = "lxd_container" # The cloud-specific instance type to run. E.g., a1.medium on AWS # If the pycloudlib instance provides a default, this can be left None @@ -34,7 +33,7 @@ INSTANCE_TYPE = None # <image_id>[::<os>[::<release>]]. If given, os and release should describe # the image specified by image_id. (Ubuntu releases are converted to this # format internally; in this case, to "focal::ubuntu::focal".) -OS_IMAGE = 'focal' +OS_IMAGE = "focal" # Populate if you want to use a pre-launched instance instead of # creating a new one. The exact contents will be platform dependent @@ -66,7 +65,7 @@ EXISTING_INSTANCE_ID = None # Install from a PPA. It MUST start with 'ppa:' # <file path> # A path to a valid package to be uploaded and installed -CLOUD_INIT_SOURCE = 'NONE' +CLOUD_INIT_SOURCE = "NONE" # Before an instance is torn down, we run `cloud-init collect-logs` # and transfer them locally. These settings specify when to collect these @@ -75,8 +74,8 @@ CLOUD_INIT_SOURCE = 'NONE' # 'ALWAYS' # 'ON_ERROR' # 'NEVER' -COLLECT_LOGS = 'ON_ERROR' -LOCAL_LOG_PATH = '/tmp/cloud_init_test_logs' +COLLECT_LOGS = "ON_ERROR" +LOCAL_LOG_PATH = "/tmp/cloud_init_test_logs" ################################################################## # SSH KEY SETTINGS @@ -124,7 +123,7 @@ except ImportError: current_settings = [var for var in locals() if var.isupper()] for setting in current_settings: env_setting = os.getenv( - 'CLOUD_INIT_{}'.format(setting), globals()[setting] + "CLOUD_INIT_{}".format(setting), globals()[setting] ) if isinstance(env_setting, str): try: diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py index f5f6c813..48f398d1 100644 --- a/tests/integration_tests/modules/test_apt.py +++ b/tests/integration_tests/modules/test_apt.py @@ -3,12 +3,11 @@ import re import pytest -from cloudinit.config import cc_apt_configure from cloudinit import gpg +from cloudinit.config import cc_apt_configure from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config apt: @@ -104,14 +103,15 @@ class TestApt: """Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg in human readable format. Mimics the output of apt-key finger """ - list_cmd = ' '.join(gpg.GPG_LIST) + ' ' + list_cmd = " ".join(gpg.GPG_LIST) + " " keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS) print(keys) files = class_client.execute( - 'ls ' + cc_apt_configure.APT_TRUSTED_GPG_DIR) + "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR + ) for file in files.split(): path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file - keys += class_client.execute(list_cmd + path) or '' + keys += class_client.execute(list_cmd + path) or "" return keys def test_sources_list(self, class_client: IntegrationInstance): @@ -124,8 +124,8 @@ class TestApt: (This is ported from `tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.) """ - sources_list = class_client.read_from_file('/etc/apt/sources.list') - assert 6 == len(sources_list.rstrip().split('\n')) + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert 6 == len(sources_list.rstrip().split("\n")) for expected_re in EXPECTED_REGEXES: assert re.search(expected_re, sources_list) is not None @@ -136,7 +136,7 @@ class TestApt: Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py """ apt_config = class_client.read_from_file( - '/etc/apt/apt.conf.d/94cloud-init-config' + "/etc/apt/apt.conf.d/94cloud-init-config" ) assert 'Assume-Yes "true";' in apt_config assert 'Fix-Broken "true";' in apt_config @@ -149,40 +149,43 @@ class TestApt: """ release = ImageSpecification.from_os_image().release ppa_path_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/' - 'simplestreams-dev-ubuntu-trunk-{}.list'.format(release) + "/etc/apt/sources.list.d/" + "simplestreams-dev-ubuntu-trunk-{}.list".format(release) ) assert ( - 'http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu' - ) in ppa_path_contents + "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu" + in ppa_path_contents + ) assert TEST_PPA_KEY in self.get_keys(class_client) def test_signed_by(self, class_client: IntegrationInstance): - """Test the apt signed-by functionality. - """ + """Test the apt signed-by functionality.""" release = ImageSpecification.from_os_image().release source = ( "deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] " "http://ppa.launchpad.net/juju/stable/ubuntu" - " {} main".format(release)) + " {} main".format(release) + ) path_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_signed_by.list') + "/etc/apt/sources.list.d/test_signed_by.list" + ) assert path_contents == source key = class_client.execute( - 'gpg --no-default-keyring --with-fingerprint --list-keys ' - '--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg') + "gpg --no-default-keyring --with-fingerprint --list-keys " + "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg" + ) assert TEST_SIGNED_BY_KEY in key def test_bad_key(self, class_client: IntegrationInstance): - """Test the apt signed-by functionality. - """ + """Test the apt signed-by functionality.""" with pytest.raises(OSError): class_client.read_from_file( - '/etc/apt/trusted.list.d/test_bad_key.gpg') + "/etc/apt/trusted.list.d/test_bad_key.gpg" + ) def test_key(self, class_client: IntegrationInstance): """Test the apt key functionality. @@ -191,12 +194,13 @@ class TestApt: tests/cloud_tests/testcases/modules/apt_configure_sources_key.py """ test_archive_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_key.list' + "/etc/apt/sources.list.d/test_key.list" ) assert ( - 'http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu' - ) in test_archive_contents + "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu" + in test_archive_contents + ) assert TEST_KEY in self.get_keys(class_client) def test_keyserver(self, class_client: IntegrationInstance): @@ -206,12 +210,13 @@ class TestApt: tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py """ test_keyserver_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_keyserver.list' + "/etc/apt/sources.list.d/test_keyserver.list" ) assert ( - 'http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu' - ) in test_keyserver_contents + "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu" + in test_keyserver_contents + ) assert TEST_KEYSERVER_KEY in self.get_keys(class_client) @@ -221,7 +226,7 @@ class TestApt: Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py """ conf_exists = class_client.execute( - 'test -f /etc/apt/apt.conf.d/90cloud-init-pipelining' + "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining" ).ok assert conf_exists is False @@ -237,7 +242,7 @@ apt: - arches: - default """ -DEFAULT_DATA = _DEFAULT_DATA.format(uri='') +DEFAULT_DATA = _DEFAULT_DATA.format(uri="") @pytest.mark.ubuntu @@ -249,9 +254,9 @@ class TestDefaults: When no uri is provided. """ - zone = class_client.execute('cloud-init query v1.availability_zone') - sources_list = class_client.read_from_file('/etc/apt/sources.list') - assert '{}.clouds.archive.ubuntu.com'.format(zone) in sources_list + zone = class_client.execute("cloud-init query v1.availability_zone") + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list def test_security(self, class_client: IntegrationInstance): """Test apt default security sources. @@ -259,12 +264,12 @@ class TestDefaults: Ported from tests/cloud_tests/testcases/modules/apt_configure_security.py """ - sources_list = class_client.read_from_file('/etc/apt/sources.list') + sources_list = class_client.read_from_file("/etc/apt/sources.list") # 3 lines from main, universe, and multiverse - assert 3 == sources_list.count('deb http://security.ubuntu.com/ubuntu') + assert 3 == sources_list.count("deb http://security.ubuntu.com/ubuntu") assert 3 == sources_list.count( - '# deb-src http://security.ubuntu.com/ubuntu' + "# deb-src http://security.ubuntu.com/ubuntu" ) @@ -280,10 +285,10 @@ def test_default_primary_with_uri(client: IntegrationInstance): Ported from tests/cloud_tests/testcases/modules/apt_configure_primary.py """ - sources_list = client.read_from_file('/etc/apt/sources.list') - assert 'archive.ubuntu.com' not in sources_list + sources_list = client.read_from_file("/etc/apt/sources.list") + assert "archive.ubuntu.com" not in sources_list - assert 'something.random.invalid' in sources_list + assert "something.random.invalid" in sources_list DISABLED_DATA = """\ @@ -310,7 +315,7 @@ class TestDisabled: sources_list = class_client.execute( "cat /etc/apt/sources.list | grep -v '^#'" ).strip() - assert '' == sources_list + assert "" == sources_list def test_disable_apt_pipelining(self, class_client: IntegrationInstance): """Test disabling of apt pipelining. @@ -319,7 +324,7 @@ class TestDisabled: tests/cloud_tests/testcases/modules/apt_pipelining_disable.py """ conf = class_client.read_from_file( - '/etc/apt/apt.conf.d/90cloud-init-pipelining' + "/etc/apt/apt.conf.d/90cloud-init-pipelining" ) assert 'Acquire::http::Pipeline-Depth "0";' in conf @@ -338,8 +343,7 @@ apt: @pytest.mark.user_data(APT_PROXY_DATA) def test_apt_proxy(client: IntegrationInstance): """Test the apt proxy data gets written correctly.""" - out = client.read_from_file( - '/etc/apt/apt.conf.d/90cloud-init-aptproxy') + out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy") assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py index 89c01a9c..d514fc62 100644 --- a/tests/integration_tests/modules/test_ca_certs.py +++ b/tests/integration_tests/modules/test_ca_certs.py @@ -10,7 +10,6 @@ import os.path import pytest - USER_DATA = """\ #cloud-config ca-certs: diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py index 3f41b34d..97bfe52d 100644 --- a/tests/integration_tests/modules/test_cli.py +++ b/tests/integration_tests/modules/test_cli.py @@ -7,7 +7,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance - VALID_USER_DATA = """\ #cloud-config runcmd: @@ -27,9 +26,9 @@ def test_valid_userdata(client: IntegrationInstance): PR #575 """ - result = client.execute('cloud-init devel schema --system') + result = client.execute("cloud-init devel schema --system") assert result.ok - assert 'Valid cloud-config: system userdata' == result.stdout.strip() + assert "Valid cloud-config: system userdata" == result.stdout.strip() @pytest.mark.sru_2020_11 @@ -39,7 +38,7 @@ def test_invalid_userdata(client: IntegrationInstance): PR #575 """ - result = client.execute('cloud-init devel schema --system') + result = client.execute("cloud-init devel schema --system") assert not result.ok - assert 'Cloud config schema errors' in result.stderr + assert "Cloud config schema errors" in result.stderr assert 'needs to begin with "#cloud-config"' in result.stderr diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py index 26a8397d..c88f40d3 100644 --- a/tests/integration_tests/modules/test_combined.py +++ b/tests/integration_tests/modules/test_combined.py @@ -6,9 +6,10 @@ the same instance launch. Most independent module coherence tests can go here. """ import json -import pytest import re +import pytest + from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import ( @@ -76,7 +77,7 @@ class TestCombined: Also tests LP 1511485: final_message is silent. """ client = class_client - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") expected = ( "This is my final message!\n" r"\d+\.\d+.*\n" @@ -94,10 +95,10 @@ class TestCombined: configuring the archives. """ client = class_client - log = client.read_from_file('/var/log/cloud-init.log') - assert 'W: Failed to fetch' not in log - assert 'W: Some index files failed to download' not in log - assert 'E: Unable to locate package ntp' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "W: Failed to fetch" not in log + assert "W: Some index files failed to download" not in log + assert "E: Unable to locate package ntp" not in log def test_byobu(self, class_client: IntegrationInstance): """Test byobu configured as enabled by default.""" @@ -107,22 +108,18 @@ class TestCombined: def test_configured_locale(self, class_client: IntegrationInstance): """Test locale can be configured correctly.""" client = class_client - default_locale = client.read_from_file('/etc/default/locale') - assert 'LANG=en_GB.UTF-8' in default_locale + default_locale = client.read_from_file("/etc/default/locale") + assert "LANG=en_GB.UTF-8" in default_locale - locale_a = client.execute('locale -a') - verify_ordered_items_in_text([ - 'en_GB.utf8', - 'en_US.utf8' - ], locale_a) + locale_a = client.execute("locale -a") + verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a) locale_gen = client.execute( "cat /etc/locale.gen | grep -v '^#' | uniq" ) - verify_ordered_items_in_text([ - 'en_GB.UTF-8', - 'en_US.UTF-8' - ], locale_gen) + verify_ordered_items_in_text( + ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen + ) def test_random_seed_data(self, class_client: IntegrationInstance): """Integration test for the random seed module. @@ -141,12 +138,12 @@ class TestCombined: def test_rsyslog(self, class_client: IntegrationInstance): """Test rsyslog is configured correctly.""" client = class_client - assert 'My test log' in client.read_from_file('/var/tmp/rsyslog.log') + assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log") def test_runcmd(self, class_client: IntegrationInstance): """Test runcmd works as expected""" client = class_client - assert 'hello world' == client.read_from_file('/var/tmp/runcmd_output') + assert "hello world" == client.read_from_file("/var/tmp/runcmd_output") @retry(tries=30, delay=1) def test_ssh_import_id(self, class_client: IntegrationInstance): @@ -160,11 +157,10 @@ class TestCombined: /home/ubuntu; this will need modification to run on other OSes. """ client = class_client - ssh_output = client.read_from_file( - "/home/ubuntu/.ssh/authorized_keys") + ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys") - assert '# ssh-import-id gh:powersj' in ssh_output - assert '# ssh-import-id lp:smoser' in ssh_output + assert "# ssh-import-id gh:powersj" in ssh_output + assert "# ssh-import-id lp:smoser" in ssh_output def test_snap(self, class_client: IntegrationInstance): """Integration test for the snap module. @@ -185,21 +181,22 @@ class TestCombined: """ client = class_client timezone_output = client.execute( - 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"') + 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"' + ) assert timezone_output.strip() == "HDT" def test_no_problems(self, class_client: IntegrationInstance): """Test no errors, warnings, or tracebacks""" client = class_client - status_file = client.read_from_file('/run/cloud-init/status.json') - status_json = json.loads(status_file)['v1'] - for stage in ('init', 'init-local', 'modules-config', 'modules-final'): - assert status_json[stage]['errors'] == [] - result_file = client.read_from_file('/run/cloud-init/result.json') - result_json = json.loads(result_file)['v1'] - assert result_json['errors'] == [] - - log = client.read_from_file('/var/log/cloud-init.log') + status_file = client.read_from_file("/run/cloud-init/status.json") + status_json = json.loads(status_file)["v1"] + for stage in ("init", "init-local", "modules-config", "modules-final"): + assert status_json[stage]["errors"] == [] + result_file = client.read_from_file("/run/cloud-init/result.json") + result_json = json.loads(result_file)["v1"] + assert result_json["errors"] == [] + + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) def test_correct_datasource_detected( @@ -228,73 +225,81 @@ class TestCombined: ) def _check_common_metadata(self, data): - assert data['base64_encoded_keys'] == [] - assert data['merged_cfg'] == 'redacted for non-root user' + assert data["base64_encoded_keys"] == [] + assert data["merged_cfg"] == "redacted for non-root user" image_spec = ImageSpecification.from_os_image() - assert data['sys_info']['dist'][0] == image_spec.os + assert data["sys_info"]["dist"][0] == image_spec.os - v1_data = data['v1'] - assert re.match(r'\d\.\d+\.\d+-\d+', v1_data['kernel_release']) - assert v1_data['variant'] == image_spec.os - assert v1_data['distro'] == image_spec.os - assert v1_data['distro_release'] == image_spec.release - assert v1_data['machine'] == 'x86_64' - assert re.match(r'3.\d\.\d', v1_data['python_version']) + v1_data = data["v1"] + assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"]) + assert v1_data["variant"] == image_spec.os + assert v1_data["distro"] == image_spec.os + assert v1_data["distro_release"] == image_spec.release + assert v1_data["machine"] == "x86_64" + assert re.match(r"3.\d\.\d", v1_data["python_version"]) @pytest.mark.lxd_container def test_instance_json_lxd(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) self._check_common_metadata(data) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'unknown' - assert v1_data['platform'] == 'lxd' - assert v1_data['subplatform'] == ( - 'seed-dir (/var/lib/cloud/seed/nocloud-net)') - assert v1_data['availability_zone'] is None - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'] == client.instance.name - assert v1_data['region'] is None + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert ( + v1_data["subplatform"] + == "seed-dir (/var/lib/cloud/seed/nocloud-net)" + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None @pytest.mark.lxd_vm def test_instance_json_lxd_vm(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) self._check_common_metadata(data) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'unknown' - assert v1_data['platform'] == 'lxd' - assert any([ - '/var/lib/cloud/seed/nocloud-net' in v1_data['subplatform'], - '/dev/sr0' in v1_data['subplatform'] - ]) - assert v1_data['availability_zone'] is None - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'] == client.instance.name - assert v1_data['region'] is None + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert any( + [ + "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"], + "/dev/sr0" in v1_data["subplatform"], + ] + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None @pytest.mark.ec2 def test_instance_json_ec2(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'aws' - assert v1_data['platform'] == 'ec2' - assert v1_data['subplatform'].startswith('metadata') - assert v1_data[ - 'availability_zone'] == client.instance.availability_zone - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'].startswith('ip-') - assert v1_data['region'] == client.cloud.cloud_instance.region + v1_data = data["v1"] + assert v1_data["cloud_name"] == "aws" + assert v1_data["platform"] == "ec2" + assert v1_data["subplatform"].startswith("metadata") + assert ( + v1_data["availability_zone"] == client.instance.availability_zone + ) + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"].startswith("ip-") + assert v1_data["region"] == client.cloud.cloud_instance.region @pytest.mark.gce def test_instance_json_gce(self, class_client: IntegrationInstance): diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py index 8429873f..96525cac 100644 --- a/tests/integration_tests/modules/test_command_output.py +++ b/tests/integration_tests/modules/test_command_output.py @@ -8,7 +8,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config output: { all: "| tee -a /var/log/cloud-init-test-output" } @@ -18,5 +17,5 @@ final_message: "should be last line in cloud-init-test-output file" @pytest.mark.user_data(USER_DATA) def test_runcmd(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init-test-output') - assert 'should be last line in cloud-init-test-output file' in log + log = client.read_from_file("/var/log/cloud-init-test-output") + assert "should be last line in cloud-init-test-output file" in log diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py index 9c9edc46..22277331 100644 --- a/tests/integration_tests/modules/test_disk_setup.py +++ b/tests/integration_tests/modules/test_disk_setup.py @@ -1,25 +1,29 @@ import json import os -import pytest from uuid import uuid4 + +import pytest from pycloudlib.lxd.instance import LXDInstance from cloudinit.subp import subp from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_clean_log -DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4()) +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) def setup_and_mount_lxd_disk(instance: LXDInstance): - subp('lxc config device add {} test-disk-setup-disk disk source={}'.format( - instance.name, DISK_PATH).split()) + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) @pytest.yield_fixture def create_disk(): # 640k should be enough for anybody - subp('dd if=/dev/zero of={} bs=1k count=640'.format(DISK_PATH).split()) + subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split()) yield os.remove(DISK_PATH) @@ -54,21 +58,21 @@ class TestDeviceAliases: """Test devices aliases work on disk setup/mount""" def test_device_alias(self, create_disk, client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert ( - "updated disk_setup device entry 'my_alias' to '/dev/sdb'" - ) in log - assert 'changed my_alias.1 => /dev/sdb1' in log - assert 'changed my_alias.2 => /dev/sdb2' in log + "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log + ) + assert "changed my_alias.1 => /dev/sdb1" in log + assert "changed my_alias.2 => /dev/sdb2" in log verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 2 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt1' - assert sdb['children'][1]['name'] == 'sdb2' - assert sdb['children'][1]['mountpoint'] == '/mnt2' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["name"] == "sdb2" + assert sdb["children"][1]["mountpoint"] == "/mnt2" PARTPROBE_USERDATA = """\ @@ -121,13 +125,13 @@ class TestPartProbeAvailability: def _verify_first_disk_setup(self, client, log): verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 2 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt1' - assert sdb['children'][1]['name'] == 'sdb2' - assert sdb['children'][1]['mountpoint'] == '/mnt2' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["name"] == "sdb2" + assert sdb["children"][1]["mountpoint"] == "/mnt2" # Not bionic or xenial because the LXD agent gets in the way of us # changing the userdata @@ -148,13 +152,13 @@ class TestPartProbeAvailability: with a warning and a traceback. When partprobe is in use, everything should work successfully. """ - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") self._verify_first_disk_setup(client, log) # Update our userdata and cloud.cfg to mount then perform new disk # setup client.write_to_file( - '/var/lib/cloud/seed/nocloud-net/user-data', + "/var/lib/cloud/seed/nocloud-net/user-data", UPDATED_PARTPROBE_USERDATA, ) client.execute( @@ -162,17 +166,17 @@ class TestPartProbeAvailability: "/etc/cloud/cloud.cfg" ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() # Assert new setup works as expected verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 1 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt3' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt3" def test_disk_setup_no_partprobe( self, create_disk, client: IntegrationInstance @@ -180,11 +184,11 @@ class TestPartProbeAvailability: """Ensure disk setup still works as expected without partprobe.""" # We can't do this part in a bootcmd because the path has already # been found by the time we get to the bootcmd - client.execute('rm $(which partprobe)') - client.execute('cloud-init clean --logs') + client.execute("rm $(which partprobe)") + client.execute("cloud-init clean --logs") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") self._verify_first_disk_setup(client, log) - assert 'partprobe' not in log + assert "partprobe" not in log diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py index af1e3a15..67251817 100644 --- a/tests/integration_tests/modules/test_growpart.py +++ b/tests/integration_tests/modules/test_growpart.py @@ -1,22 +1,26 @@ +import json import os -import pytest import pathlib -import json from uuid import uuid4 + +import pytest from pycloudlib.lxd.instance import LXDInstance from cloudinit.subp import subp from tests.integration_tests.instances import IntegrationInstance -DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4()) +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) def setup_and_mount_lxd_disk(instance: LXDInstance): - subp('lxc config device add {} test-disk-setup-disk disk source={}'.format( - instance.name, DISK_PATH).split()) + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) -@pytest.fixture(scope='class', autouse=True) +@pytest.fixture(scope="class", autouse=True) def create_disk(): """Create 16M sparse file""" pathlib.Path(DISK_PATH).touch() @@ -50,13 +54,15 @@ class TestGrowPart: """Test growpart""" def test_grow_part(self, client: IntegrationInstance): - """Verify """ - log = client.read_from_file('/var/log/cloud-init.log') - assert ("cc_growpart.py[INFO]: '/dev/sdb1' resized:" - " changed (/dev/sdb, 1) from") in log - - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 1 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['size'] == '16M' + """Verify""" + log = client.read_from_file("/var/log/cloud-init.log") + assert ( + "cc_growpart.py[INFO]: '/dev/sdb1' resized:" + " changed (/dev/sdb, 1) from" in log + ) + + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["size"] == "16M" diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py index f5abc86f..0bad761e 100644 --- a/tests/integration_tests/modules/test_hotplug.py +++ b/tests/integration_tests/modules/test_hotplug.py @@ -1,8 +1,9 @@ -import pytest import time -import yaml from collections import namedtuple +import pytest +import yaml + from tests.integration_tests.instances import IntegrationInstance USER_DATA = """\ @@ -12,28 +13,28 @@ updates: when: ['hotplug'] """ -ip_addr = namedtuple('ip_addr', 'interface state ip4 ip6') +ip_addr = namedtuple("ip_addr", "interface state ip4 ip6") def _wait_till_hotplug_complete(client, expected_runs=1): for _ in range(60): - log = client.read_from_file('/var/log/cloud-init.log') - if log.count('Exiting hotplug handler') == expected_runs: + log = client.read_from_file("/var/log/cloud-init.log") + if log.count("Exiting hotplug handler") == expected_runs: return log time.sleep(1) - raise Exception('Waiting for hotplug handler failed') + raise Exception("Waiting for hotplug handler failed") def _get_ip_addr(client): ips = [] - lines = client.execute('ip --brief addr').split('\n') + lines = client.execute("ip --brief addr").split("\n") for line in lines: attributes = line.split() interface, state = attributes[0], attributes[1] ip4_cidr = attributes[2] if len(attributes) > 2 else None ip6_cidr = attributes[3] if len(attributes) > 3 else None - ip4 = ip4_cidr.split('/')[0] if ip4_cidr else None - ip6 = ip6_cidr.split('/')[0] if ip6_cidr else None + ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None + ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None ip = ip_addr(interface, state, ip4, ip6) ips.append(ip) return ips @@ -47,10 +48,10 @@ def _get_ip_addr(client): @pytest.mark.user_data(USER_DATA) def test_hotplug_add_remove(client: IntegrationInstance): ips_before = _get_ip_addr(client) - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Exiting hotplug handler' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log assert client.execute( - 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules' + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" ).ok # Add new NIC @@ -62,11 +63,11 @@ def test_hotplug_add_remove(client: IntegrationInstance): assert len(ips_after_add) == len(ips_before) + 1 assert added_ip not in [ip.ip4 for ip in ips_before] assert added_ip in [ip.ip4 for ip in ips_after_add] - assert new_addition.state == 'UP' + assert new_addition.state == "UP" - netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml') + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") config = yaml.safe_load(netplan_cfg) - assert new_addition.interface in config['network']['ethernets'] + assert new_addition.interface in config["network"]["ethernets"] # Remove new NIC client.instance.remove_network_interface(added_ip) @@ -75,37 +76,37 @@ def test_hotplug_add_remove(client: IntegrationInstance): assert len(ips_after_remove) == len(ips_before) assert added_ip not in [ip.ip4 for ip in ips_after_remove] - netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml') + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") config = yaml.safe_load(netplan_cfg) - assert new_addition.interface not in config['network']['ethernets'] + assert new_addition.interface not in config["network"]["ethernets"] - assert 'enabled' == client.execute( - 'cloud-init devel hotplug-hook -s net query' + assert "enabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" ) @pytest.mark.openstack def test_no_hotplug_in_userdata(client: IntegrationInstance): ips_before = _get_ip_addr(client) - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Exiting hotplug handler' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log assert client.execute( - 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules' + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" ).failed # Add new NIC client.instance.add_network_interface() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'hotplug-hook' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "hotplug-hook" not in log ips_after_add = _get_ip_addr(client) if len(ips_after_add) == len(ips_before) + 1: # We can see the device, but it should not have been brought up new_ip = [ip for ip in ips_after_add if ip not in ips_before][0] - assert new_ip.state == 'DOWN' + assert new_ip.state == "DOWN" else: assert len(ips_after_add) == len(ips_before) - assert 'disabled' == client.execute( - 'cloud-init devel hotplug-hook -s net query' + assert "disabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" ) diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py index fe8eff1a..7788c6f0 100644 --- a/tests/integration_tests/modules/test_jinja_templating.py +++ b/tests/integration_tests/modules/test_jinja_templating.py @@ -4,7 +4,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_ordered_items_in_text - USER_DATA = """\ ## template: jinja #cloud-config @@ -23,12 +22,12 @@ def test_runcmd_with_variable_substitution(client: IntegrationInstance): we can also substitute variables from instance-data-sensitive LP: #1931392. """ - hostname = client.execute('hostname').stdout.strip() + hostname = client.execute("hostname").stdout.strip() expected = [ hostname, - ('Merged cloud-init system config from /etc/cloud/cloud.cfg and ' - '/etc/cloud/cloud.cfg.d/'), - hostname + "Merged cloud-init system config from /etc/cloud/cloud.cfg and " + "/etc/cloud/cloud.cfg.d/", + hostname, ] - output = client.read_from_file('/var/tmp/runcmd_output') + output = client.read_from_file("/var/tmp/runcmd_output") verify_ordered_items_in_text(expected, output) diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py index e79db3c7..50899982 100644 --- a/tests/integration_tests/modules/test_keys_to_console.py +++ b/tests/integration_tests/modules/test_keys_to_console.py @@ -36,6 +36,7 @@ users: @pytest.mark.user_data(BLACKLIST_USER_DATA) class TestKeysToConsoleBlacklist: """Test that the blacklist options work as expected.""" + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA"]) def test_excluded_keys(self, class_client, key_type): syslog = class_client.read_from_file("/var/log/syslog") @@ -55,6 +56,7 @@ class TestAllKeysToConsoleBlacklist: """Test that when key blacklist contains all key types that no header/footer are output. """ + def test_header_excluded(self, class_client): syslog = class_client.read_from_file("/var/log/syslog") assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog @@ -67,6 +69,7 @@ class TestAllKeysToConsoleBlacklist: @pytest.mark.user_data(DISABLED_USER_DATA) class TestKeysToConsoleDisabled: """Test that output can be fully disabled.""" + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"]) def test_keys_excluded(self, class_client, key_type): syslog = class_client.read_from_file("/var/log/syslog") @@ -90,7 +93,7 @@ class TestKeysToConsoleEnabled: """Test that output can be enabled disabled.""" def test_duplicate_messaging_console_log(self, class_client): - class_client.execute('cloud-init status --wait --long').ok + class_client.execute("cloud-init status --wait --long").ok try: console_log = class_client.instance.console_log() except NotImplementedError: @@ -98,13 +101,13 @@ class TestKeysToConsoleEnabled: # log pytest.skip("NotImplementedError when requesting console log") return - if console_log.lower() == 'no console output': + if console_log.lower() == "no console output": # This test retries because we might not have the full console log # on the first fetch. However, if we have no console output # at all, we don't want to keep retrying as that would trigger # another 5 minute wait on the pycloudlib side, which could # leave us waiting for a couple hours - pytest.fail('no console output') + pytest.fail("no console output") return msg = "no authorized SSH keys fingerprints found for user barfoo." assert 1 == console_log.count(msg) diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py index 65dce3c7..2cb3f4f3 100644 --- a/tests/integration_tests/modules/test_lxd_bridge.py +++ b/tests/integration_tests/modules/test_lxd_bridge.py @@ -8,7 +8,6 @@ import yaml from tests.integration_tests.util import verify_clean_log - USER_DATA = """\ #cloud-config lxd: @@ -29,7 +28,6 @@ lxd: @pytest.mark.no_container @pytest.mark.user_data(USER_DATA) class TestLxdBridge: - @pytest.mark.parametrize("binary_name", ["lxc", "lxd"]) def test_binaries_installed(self, class_client, binary_name): """Check that the expected LXD binaries are installed""" diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py index c777a641..fc62e63b 100644 --- a/tests/integration_tests/modules/test_ntp_servers.py +++ b/tests/integration_tests/modules/test_ntp_servers.py @@ -9,8 +9,8 @@ and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``) """ import re -import yaml import pytest +import yaml from tests.integration_tests.instances import IntegrationInstance @@ -33,13 +33,13 @@ EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"] @pytest.mark.user_data(USER_DATA) class TestNtpServers: - def test_ntp_installed(self, class_client: IntegrationInstance): """Test that `ntpd --version` succeeds, indicating installation.""" assert class_client.execute("ntpd --version").ok - def test_dist_config_file_is_empty(self, - class_client: IntegrationInstance): + def test_dist_config_file_is_empty( + self, class_client: IntegrationInstance + ): """Test that the distributed config file is empty. (This test is skipped on all currently supported Ubuntu releases, so @@ -56,13 +56,13 @@ class TestNtpServers: assert re.search( r"^server {} iburst".format(expected_server), ntp_conf, - re.MULTILINE + re.MULTILINE, ) for expected_pool in EXPECTED_POOLS: assert re.search( r"^pool {} iburst".format(expected_pool), ntp_conf, - re.MULTILINE + re.MULTILINE, ) def test_ntpq_servers(self, class_client: IntegrationInstance): @@ -84,12 +84,12 @@ ntp: @pytest.mark.user_data(CHRONY_DATA) def test_chrony(client: IntegrationInstance): - if client.execute('test -f /etc/chrony.conf').ok: - chrony_conf = '/etc/chrony.conf' + if client.execute("test -f /etc/chrony.conf").ok: + chrony_conf = "/etc/chrony.conf" else: - chrony_conf = '/etc/chrony/chrony.conf' + chrony_conf = "/etc/chrony/chrony.conf" contents = client.read_from_file(chrony_conf) - assert 'server 172.16.15.14' in contents + assert "server 172.16.15.14" in contents TIMESYNCD_DATA = """\ @@ -105,9 +105,9 @@ ntp: @pytest.mark.user_data(TIMESYNCD_DATA) def test_timesyncd(client: IntegrationInstance): contents = client.read_from_file( - '/etc/systemd/timesyncd.conf.d/cloud-init.conf' + "/etc/systemd/timesyncd.conf.d/cloud-init.conf" ) - assert 'NTP=172.16.15.14' in contents + assert "NTP=172.16.15.14" in contents EMPTY_NTP = """\ @@ -121,8 +121,8 @@ ntp: @pytest.mark.user_data(EMPTY_NTP) def test_empty_ntp(client: IntegrationInstance): - assert client.execute('ntpd --version').ok - assert client.execute('test -f /etc/ntp.conf.dist').failed - assert 'pool.ntp.org iburst' in client.execute( + assert client.execute("ntpd --version").ok + assert client.execute("test -f /etc/ntp.conf.dist").failed + assert "pool.ntp.org iburst" in client.execute( 'grep -v "^#" /etc/ntp.conf' ) diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py index 28d741bc..d668d81c 100644 --- a/tests/integration_tests/modules/test_package_update_upgrade_install.py +++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py @@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as """ import re -import pytest +import pytest USER_DATA = """\ #cloud-config @@ -29,7 +29,6 @@ package_upgrade: true @pytest.mark.ubuntu @pytest.mark.user_data(USER_DATA) class TestPackageUpdateUpgradeInstall: - def assert_package_installed(self, pkg_out, name, version=None): """Check dpkg-query --show output for matching package name. @@ -38,7 +37,8 @@ class TestPackageUpdateUpgradeInstall: version. """ pkg_match = re.search( - "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE) + "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE + ) if pkg_match: installed_version = pkg_match.group("version") if not version: @@ -46,8 +46,10 @@ class TestPackageUpdateUpgradeInstall: if installed_version.startswith(version): return # Success raise AssertionError( - "Expected package version %s-%s not found. Found %s" % - name, version, installed_version) + "Expected package version %s-%s not found. Found %s" % name, + version, + installed_version, + ) raise AssertionError("Package not installed: %s" % name) def test_new_packages_are_installed(self, class_client): @@ -58,11 +60,13 @@ class TestPackageUpdateUpgradeInstall: def test_packages_were_updated(self, class_client): out = class_client.execute( - "grep ^Commandline: /var/log/apt/history.log") + "grep ^Commandline: /var/log/apt/history.log" + ) assert ( "Commandline: /usr/bin/apt-get --option=Dpkg::Options" "::=--force-confold --option=Dpkg::options::=--force-unsafe-io " - "--assume-yes --quiet install sl tree") in out + "--assume-yes --quiet install sl tree" in out + ) def test_packages_were_upgraded(self, class_client): """Test cloud-init-output for install & upgrade stuff.""" diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py index 00fdeaea..33527e1e 100644 --- a/tests/integration_tests/modules/test_persistence.py +++ b/tests/integration_tests/modules/test_persistence.py @@ -10,21 +10,23 @@ from tests.integration_tests.util import ( verify_ordered_items_in_text, ) - -PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl') -TEST_PICKLE = ASSETS_DIR / 'trusty_with_mime.pkl' +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl" @pytest.mark.lxd_container def test_log_message_on_missing_version_file(client: IntegrationInstance): client.push_file(TEST_PICKLE, PICKLE_PATH) client.restart() - assert client.execute('cloud-init status --wait').ok - log = client.read_from_file('/var/log/cloud-init.log') - verify_ordered_items_in_text([ - "Unable to unpickle datasource: 'MIMEMultipart' object has no " - "attribute 'policy'. Ignoring current cache.", - 'no cache found', - 'Searching for local data source', - 'SUCCESS: found local data from DataSourceNoCloud' - ], log) + assert client.execute("cloud-init status --wait").ok + log = client.read_from_file("/var/log/cloud-init.log") + verify_ordered_items_in_text( + [ + "Unable to unpickle datasource: 'MIMEMultipart' object has no " + "attribute 'policy'. Ignoring current cache.", + "no cache found", + "Searching for local data source", + "SUCCESS: found local data from DataSourceNoCloud", + ], + log, + ) diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py index 5f3a32ac..a629029d 100644 --- a/tests/integration_tests/modules/test_power_state_change.py +++ b/tests/integration_tests/modules/test_power_state_change.py @@ -30,7 +30,7 @@ def _detect_reboot(instance: IntegrationInstance): instance.instance.wait() for _ in range(600): try: - log = instance.read_from_file('/var/log/cloud-init.log') + log = instance.read_from_file("/var/log/cloud-init.log") boot_count = log.count("running 'init-local'") if boot_count == 1: instance.instance.wait() @@ -40,11 +40,11 @@ def _detect_reboot(instance: IntegrationInstance): pass time.sleep(1) else: - raise Exception('Could not detect reboot') + raise Exception("Could not detect reboot") def _can_connect(instance): - return instance.execute('true').ok + return instance.execute("true").ok # This test is marked unstable because even though it should be able to @@ -55,36 +55,44 @@ def _can_connect(instance): @pytest.mark.ubuntu @pytest.mark.lxd_container class TestPowerChange: - @pytest.mark.parametrize('mode,delay,timeout,expected', [ - ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'), - ('reboot', 'now', '0', 'will execute: shutdown -r now msg'), - ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'), - ]) - def test_poweroff(self, session_cloud: IntegrationCloud, - mode, delay, timeout, expected): + @pytest.mark.parametrize( + "mode,delay,timeout,expected", + [ + ("poweroff", "now", "10", "will execute: shutdown -P now msg"), + ("reboot", "now", "0", "will execute: shutdown -r now msg"), + ("halt", "+1", "0", "will execute: shutdown -H +1 msg"), + ], + ) + def test_poweroff( + self, session_cloud: IntegrationCloud, mode, delay, timeout, expected + ): with session_cloud.launch( user_data=USER_DATA.format( - delay=delay, mode=mode, timeout=timeout, condition='true'), - launch_kwargs={'wait': False}, + delay=delay, mode=mode, timeout=timeout, condition="true" + ), + launch_kwargs={"wait": False}, ) as instance: - if mode == 'reboot': + if mode == "reboot": _detect_reboot(instance) else: instance.instance.wait_for_stop() instance.instance.start(wait=True) - log = instance.read_from_file('/var/log/cloud-init.log') + log = instance.read_from_file("/var/log/cloud-init.log") assert _can_connect(instance) lines_to_check = [ - 'Running module power-state-change', + "Running module power-state-change", expected, "running 'init-local'", - 'config-power-state-change already ran', + "config-power-state-change already ran", ] verify_ordered_items_in_text(lines_to_check, log) - @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff', - timeout='0', condition='false')) + @pytest.mark.user_data( + USER_DATA.format( + delay="0", mode="poweroff", timeout="0", condition="false" + ) + ) def test_poweroff_false_condition(self, client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert _can_connect(client) - assert 'Condition was false. Will not perform state change' in log + assert "Condition was false. Will not perform state change" in log diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py index f40a6ca3..1bd9cee4 100644 --- a/tests/integration_tests/modules/test_puppet.py +++ b/tests/integration_tests/modules/test_puppet.py @@ -15,9 +15,9 @@ puppet: @pytest.mark.user_data(SERVICE_DATA) def test_puppet_service(client: IntegrationInstance): """Basic test that puppet gets installed and runs.""" - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) - assert client.execute('systemctl is-active puppet').ok + assert client.execute("systemctl is-active puppet").ok assert "Running command ['puppet', 'agent'" not in log @@ -35,5 +35,5 @@ puppet: @pytest.mark.user_data(EXEC_DATA) def test_pupet_exec(client: IntegrationInstance): """Basic test that puppet gets installed and runs.""" - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert "Running command ['puppet', 'agent', '--noop']" in log diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py index e7f7f6b6..ae0aeae9 100644 --- a/tests/integration_tests/modules/test_set_hostname.py +++ b/tests/integration_tests/modules/test_set_hostname.py @@ -11,7 +11,6 @@ after the system is boot. import pytest - USER_DATA_HOSTNAME = """\ #cloud-config hostname: cloudinit2 @@ -34,7 +33,6 @@ fqdn: cloudinit2.test.io @pytest.mark.ci class TestHostname: - @pytest.mark.user_data(USER_DATA_HOSTNAME) def test_hostname(self, client): hostname_output = client.execute("hostname") @@ -59,6 +57,8 @@ class TestHostname: assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip() host_output = client.execute("grep ^127 /etc/hosts") - assert '127.0.1.1 {} {}'.format( - fqdn_output, hostname_output) in host_output - assert '127.0.0.1 localhost' in host_output + assert ( + "127.0.1.1 {} {}".format(fqdn_output, hostname_output) + in host_output + ) + assert "127.0.0.1 localhost" in host_output diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py index ac9db19d..e0f8b692 100644 --- a/tests/integration_tests/modules/test_set_password.py +++ b/tests/integration_tests/modules/test_set_password.py @@ -15,7 +15,6 @@ import yaml from tests.integration_tests.util import retry - COMMON_USER_DATA = """\ #cloud-config ssh_pwauth: yes @@ -42,7 +41,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/" lock_passwd: false """ -LIST_USER_DATA = COMMON_USER_DATA + """ +LIST_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: - tom:mypassword123! @@ -50,8 +51,11 @@ chpasswd: - harry:RANDOM - mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) -STRING_USER_DATA = COMMON_USER_DATA + """ +STRING_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: | tom:mypassword123! @@ -59,6 +63,7 @@ chpasswd: harry:RANDOM mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"] USERS_PASSWD_VALUES = { @@ -141,13 +146,13 @@ class Mixin: # log pytest.skip("NotImplementedError when requesting console log") return - if console_log.lower() == 'no console output': + if console_log.lower() == "no console output": # This test retries because we might not have the full console log # on the first fetch. However, if we have no console output # at all, we don't want to keep retrying as that would trigger # another 5 minute wait on the pycloudlib side, which could # leave us waiting for a couple hours - pytest.fail('no console output') + pytest.fail("no console output") return assert "dick:" in console_log assert "harry:" in console_log diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py index cf14d0b0..89b49576 100644 --- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py +++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py @@ -14,7 +14,6 @@ import pytest from tests.integration_tests.util import retry - USER_DATA_SSH_AUTHKEY_DISABLE = """\ #cloud-config no_ssh_fingerprints: true @@ -32,13 +31,13 @@ ssh_authorized_keys: @pytest.mark.ci class TestSshAuthkeyFingerprints: - @pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE) def test_ssh_authkey_fingerprints_disable(self, client): cloudinit_output = client.read_from_file("/var/log/cloud-init.log") assert ( "Skipping module named ssh-authkey-fingerprints, " - "logging of SSH fingerprints disabled") in cloudinit_output + "logging of SSH fingerprints disabled" in cloudinit_output + ) # retry decorator here because it can take some time to be reflected # in syslog @@ -47,7 +46,7 @@ class TestSshAuthkeyFingerprints: def test_ssh_authkey_fingerprints_enable(self, client): syslog_output = client.read_from_file("/var/log/syslog") - assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None - assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None - assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None - assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None + assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None + assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None + assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None + assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py index 60c36982..1dd0adf1 100644 --- a/tests/integration_tests/modules/test_ssh_generate.py +++ b/tests/integration_tests/modules/test_ssh_generate.py @@ -10,7 +10,6 @@ keys were created. import pytest - USER_DATA = """\ #cloud-config ssh_genkeytypes: @@ -23,28 +22,27 @@ authkey_hash: sha512 @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysGenerate: - @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_dsa_key.pub", "/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_rsa_key.pub", "/etc/ssh/ssh_host_rsa_key", - ) + ), ) def test_ssh_keys_not_generated(self, ssh_key_path, class_client): - out = class_client.execute( - "test -e {}".format(ssh_key_path) - ) + out = class_client.execute("test -e {}".format(ssh_key_path)) assert out.failed @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_ecdsa_key.pub", "/etc/ssh/ssh_host_ecdsa_key", "/etc/ssh/ssh_host_ed25519_key.pub", "/etc/ssh/ssh_host_ed25519_key", - ) + ), ) def test_ssh_keys_generated(self, ssh_key_path, class_client): out = class_client.read_from_file(ssh_key_path) diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py index 6aae96ae..b79f18eb 100644 --- a/tests/integration_tests/modules/test_ssh_keys_provided.py +++ b/tests/integration_tests/modules/test_ssh_keys_provided.py @@ -9,7 +9,6 @@ system. import pytest - USER_DATA = """\ #cloud-config disable_root: false @@ -82,44 +81,33 @@ ssh_keys: @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysProvided: - @pytest.mark.parametrize( "config_path,expected_out", ( ( "/etc/ssh/ssh_host_dsa_key.pub", - ( - "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" - "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM" - ), + "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" + "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM", ), ( "/etc/ssh/ssh_host_dsa_key", - ( - "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" - "hOVAfzZ6+jklP" - ), + "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" + "hOVAfzZ6+jklP", ), ( "/etc/ssh/ssh_host_rsa_key.pub", - ( - "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" - "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4" - ), + "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" + "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4", ), ( "/etc/ssh/ssh_host_rsa_key", - ( - "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" - "RQvLZpMRdywBm" - ), + "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" + "RQvLZpMRdywBm", ), ( "/etc/ssh/ssh_host_rsa_key-cert.pub", - ( - "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" - "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD" - ), + "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" + "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD", ), ( "/etc/ssh/sshd_config", @@ -127,33 +115,25 @@ class TestSshKeysProvided: ), ( "/etc/ssh/ssh_host_ecdsa_key.pub", - ( - "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" - "BBFsS5Tvky/IC/dXhE/afxxU" - ), + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" + "BBFsS5Tvky/IC/dXhE/afxxU", ), ( "/etc/ssh/ssh_host_ecdsa_key", - ( - "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" - "5mpZqxgX4vcgb" - ), + "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" + "5mpZqxgX4vcgb", ), ( "/etc/ssh/ssh_host_ed25519_key.pub", - ( - "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" - "G15dqjQ2XkNVOEnb5" - ), + "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" + "G15dqjQ2XkNVOEnb5", ), ( "/etc/ssh/ssh_host_ed25519_key", - ( - "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" - "OhteXao0Nl5DVThJ2+Q" - ), + "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" + "OhteXao0Nl5DVThJ2+Q", ), - ) + ), ) def test_ssh_provided_keys(self, config_path, expected_out, class_client): out = class_client.read_from_file(config_path).strip() diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py index b39454e6..8330a1ce 100644 --- a/tests/integration_tests/modules/test_ssh_keysfile.py +++ b/tests/integration_tests/modules/test_ssh_keysfile.py @@ -1,15 +1,16 @@ +from io import StringIO + import paramiko import pytest -from io import StringIO from paramiko.ssh_exception import SSHException from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import get_test_rsa_keypair -TEST_USER1_KEYS = get_test_rsa_keypair('test1') -TEST_USER2_KEYS = get_test_rsa_keypair('test2') -TEST_DEFAULT_KEYS = get_test_rsa_keypair('test3') +TEST_USER1_KEYS = get_test_rsa_keypair("test1") +TEST_USER2_KEYS = get_test_rsa_keypair("test2") +TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3") _USERDATA = """\ #cloud-config @@ -26,7 +27,7 @@ users: ssh_authorized_keys: - {user2} """.format( - bootcmd='{bootcmd}', + bootcmd="{bootcmd}", default=TEST_DEFAULT_KEYS.public_key, user1=TEST_USER1_KEYS.public_key, user2=TEST_USER2_KEYS.public_key, @@ -37,9 +38,9 @@ def common_verify(client, expected_keys): for user, filename, keys in expected_keys: # Ensure key is in the key file contents = client.read_from_file(filename) - if user in ['ubuntu', 'root']: - lines = contents.split('\n') - if user == 'root': + if user in ["ubuntu", "root"]: + lines = contents.split("\n") + if user == "root": # Our personal public key gets added by pycloudlib in # addition to the default `ssh_authorized_keys` assert len(lines) == 2 @@ -54,8 +55,9 @@ def common_verify(client, expected_keys): # Ensure we can actually connect ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - paramiko_key = paramiko.RSAKey.from_private_key(StringIO( - keys.private_key)) + paramiko_key = paramiko.RSAKey.from_private_key( + StringIO(keys.private_key) + ) # Will fail with AuthenticationException if # we cannot connect @@ -71,8 +73,11 @@ def common_verify(client, expected_keys): other_users = [u[0] for u in expected_keys if u[2] != keys] for other_user in other_users: with pytest.raises(SSHException): - print('trying to connect as {} with key from {}'.format( - other_user, user)) + print( + "trying to connect as {} with key from {}".format( + other_user, user + ) + ) ssh.connect( client.instance.ip, username=other_user, @@ -83,37 +88,38 @@ def common_verify(client, expected_keys): # Ensure we haven't messed with any /home permissions # See LP: #1940233 - home_dir = '/home/{}'.format(user) + home_dir = "/home/{}".format(user) # Home permissions aren't consistent between releases. On ubuntu # this can change to 750 once focal is unsupported. if ImageSpecification.from_os_image().release in ("bionic", "focal"): - home_perms = '755' + home_perms = "755" else: - home_perms = '750' - if user == 'root': - home_dir = '/root' - home_perms = '700' - assert '{} {}'.format(user, home_perms) == client.execute( + home_perms = "750" + if user == "root": + home_dir = "/root" + home_perms = "700" + assert "{} {}".format(user, home_perms) == client.execute( 'stat -c "%U %a" {}'.format(home_dir) ) if client.execute("test -d {}/.ssh".format(home_dir)).ok: - assert '{} 700'.format(user) == client.execute( + assert "{} 700".format(user) == client.execute( 'stat -c "%U %a" {}/.ssh'.format(home_dir) ) - assert '{} 600'.format(user) == client.execute( + assert "{} 600".format(user) == client.execute( 'stat -c "%U %a" {}'.format(filename) ) # Also ensure ssh-keygen works as expected - client.execute('mkdir {}/.ssh'.format(home_dir)) + client.execute("mkdir {}/.ssh".format(home_dir)) assert client.execute( "ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format( - home_dir) + home_dir + ) ).ok - assert client.execute('test -f {}/.ssh/id_rsa'.format(home_dir)) - assert client.execute('test -f {}/.ssh/id_rsa.pub'.format(home_dir)) + assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir)) + assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir)) - assert 'root 755' == client.execute('stat -c "%U %a" /home') + assert "root 755" == client.execute('stat -c "%U %a" /home') DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""') @@ -123,75 +129,96 @@ DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""') @pytest.mark.user_data(DEFAULT_KEYS_USERDATA) def test_authorized_keys_default(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/.ssh/authorized_keys', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/.ssh/authorized_keys', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/.ssh/authorized_keys', - TEST_DEFAULT_KEYS), - ('root', '/root/.ssh/authorized_keys', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' " - "/etc/ssh/sshd_config")) +AUTHORIZED_KEYS2_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA) def test_authorized_keys2(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/.ssh/authorized_keys2', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/.ssh/authorized_keys2', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/.ssh/authorized_keys2', - TEST_DEFAULT_KEYS), - ('root', '/root/.ssh/authorized_keys2', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys2", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys2", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -NESTED_KEYS_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' " - "/etc/ssh/sshd_config")) +NESTED_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(NESTED_KEYS_USERDATA) def test_nested_keys(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/foo/bar/ssh/keys', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/foo/bar/ssh/keys', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/foo/bar/ssh/keys', - TEST_DEFAULT_KEYS), - ('root', '/root/foo/bar/ssh/keys', TEST_DEFAULT_KEYS), + ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS), + ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS), + ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), + ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -EXTERNAL_KEYS_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' " - "/etc/ssh/sshd_config")) +EXTERNAL_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(EXTERNAL_KEYS_USERDATA) def test_external_keys(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/etc/ssh/authorized_keys/test_user1/keys', - TEST_USER1_KEYS), - ('test_user2', '/etc/ssh/authorized_keys/test_user2/keys', - TEST_USER2_KEYS), - ('ubuntu', '/etc/ssh/authorized_keys/ubuntu/keys', - TEST_DEFAULT_KEYS), - ('root', '/etc/ssh/authorized_keys/root/keys', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/etc/ssh/authorized_keys/test_user1/keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/etc/ssh/authorized_keys/test_user2/keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS), + ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py index fffa0746..e4a4241f 100644 --- a/tests/integration_tests/modules/test_user_events.py +++ b/tests/integration_tests/modules/test_user_events.py @@ -3,8 +3,9 @@ This is currently limited to applying network config on BOOT events. """ -import pytest import re + +import pytest import yaml from tests.integration_tests.instances import IntegrationInstance @@ -13,16 +14,16 @@ from tests.integration_tests.instances import IntegrationInstance def _add_dummy_bridge_to_netplan(client: IntegrationInstance): # Update netplan configuration to ensure it doesn't change on reboot netplan = yaml.safe_load( - client.execute('cat /etc/netplan/50-cloud-init.yaml') + client.execute("cat /etc/netplan/50-cloud-init.yaml") ) # Just a dummy bridge to do nothing try: - netplan['network']['bridges']['dummy0'] = {'dhcp4': False} + netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False} except KeyError: - netplan['network']['bridges'] = {'dummy0': {'dhcp4': False}} + netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}} dumped_netplan = yaml.dump(netplan) - client.write_to_file('/etc/netplan/50-cloud-init.yaml', dumped_netplan) + client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan) @pytest.mark.lxd_container @@ -32,19 +33,19 @@ def _add_dummy_bridge_to_netplan(client: IntegrationInstance): @pytest.mark.oci @pytest.mark.openstack def test_boot_event_disabled_by_default(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - if 'network config is disabled' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: pytest.skip("network config disabled. Test doesn't apply") - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") _add_dummy_bridge_to_netplan(client) - client.execute('rm /var/log/cloud-init.log') + client.execute("rm /var/log/cloud-init.log") client.restart() - log2 = client.read_from_file('/var/log/cloud-init.log') + log2 = client.read_from_file("/var/log/cloud-init.log") - if 'cache invalid in datasource' in log2: + if "cache invalid in datasource" in log2: # Invalid cache will get cleared, meaning we'll create a new # "instance" and apply networking config, so events aren't # really relevant here @@ -53,8 +54,9 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance): # We attempt to apply network config twice on every boot. # Ensure neither time works. assert 2 == len( - re.findall(r"Event Denied: scopes=\['network'\] EventType=boot[^-]", - log2) + re.findall( + r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2 + ) ) assert 2 == log2.count( "Event Denied: scopes=['network'] EventType=boot-legacy" @@ -64,30 +66,30 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance): " nor datasource network update allowed" ) - assert 'dummy0' in client.execute('ls /sys/class/net') + assert "dummy0" in client.execute("ls /sys/class/net") def _test_network_config_applied_on_reboot(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - if 'network config is disabled' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: pytest.skip("network config disabled. Test doesn't apply") - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") _add_dummy_bridge_to_netplan(client) client.execute('echo "" > /var/log/cloud-init.log') client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - if 'cache invalid in datasource' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "cache invalid in datasource" in log: # Invalid cache will get cleared, meaning we'll create a new # "instance" and apply networking config, so events aren't # really relevant here pytest.skip("Test only valid for existing instances") - assert 'Event Allowed: scope=network EventType=boot' in log - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Event Allowed: scope=network EventType=boot" in log + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") @pytest.mark.azure diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py index bcb17b7f..3d1358ce 100644 --- a/tests/integration_tests/modules/test_users_groups.py +++ b/tests/integration_tests/modules/test_users_groups.py @@ -11,7 +11,6 @@ import pytest from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config # Add groups to the system @@ -84,7 +83,9 @@ class TestUsersGroups: assert re.search(regex, result.stdout) is not None, ( "'getent {}' resulted in '{}', " "but expected to match regex {}".format( - ' '.join(getent_args), result.stdout, regex)) + " ".join(getent_args), result.stdout, regex + ) + ) def test_user_root_in_secret(self, class_client): """Test root user is in 'secret' group.""" @@ -105,19 +106,21 @@ def test_sudoers_includedir(client: IntegrationInstance): https://github.com/canonical/cloud-init/pull/783 """ if ImageSpecification.from_os_image().release in [ - 'xenial', 'bionic', 'focal' + "xenial", + "bionic", + "focal", ]: raise pytest.skip( - 'Test requires version of sudo installed on groovy and later' + "Test requires version of sudo installed on groovy and later" ) client.execute("sed -i 's/#include/@include/g' /etc/sudoers") - sudoers = client.read_from_file('/etc/sudoers') - if '@includedir /etc/sudoers.d' not in sudoers: + sudoers = client.read_from_file("/etc/sudoers") + if "@includedir /etc/sudoers.d" not in sudoers: client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers") client.instance.clean() client.restart() - sudoers = client.read_from_file('/etc/sudoers') + sudoers = client.read_from_file("/etc/sudoers") - assert '#includedir' not in sudoers - assert sudoers.count('includedir /etc/sudoers.d') == 1 + assert "#includedir" not in sudoers + assert sudoers.count("includedir /etc/sudoers.d") == 1 diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py index f28079d4..3168cd60 100644 --- a/tests/integration_tests/modules/test_version_change.py +++ b/tests/integration_tests/modules/test_version_change.py @@ -5,39 +5,40 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import ASSETS_DIR, verify_clean_log - -PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl') -TEST_PICKLE = ASSETS_DIR / 'test_version_change.pkl' +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl" def _assert_no_pickle_problems(log): - assert 'Failed loading pickled blob' not in log + assert "Failed loading pickled blob" not in log verify_clean_log(log) def test_reboot_without_version_change(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected' not in log - assert 'Cache compatibility status is currently unknown.' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Cache compatibility status is currently unknown." not in log _assert_no_pickle_problems(log) client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected' not in log - assert 'Could not determine Python version used to write cache' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Could not determine Python version used to write cache" not in log _assert_no_pickle_problems(log) # Now ensure that loading a bad pickle gives us problems client.push_file(TEST_PICKLE, PICKLE_PATH) client.restart() - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") # no cache found is an "expected" upgrade error, and # "Failed" means we're unable to load the pickle - assert any([ - 'Failed loading pickled blob from {}'.format(PICKLE_PATH) in log, - 'no cache found' in log - ]) + assert any( + [ + "Failed loading pickled blob from {}".format(PICKLE_PATH) in log, + "no cache found" in log, + ] + ) @pytest.mark.ec2 @@ -54,8 +55,8 @@ def test_cache_purged_on_version_change(client: IntegrationInstance): client.push_file(TEST_PICKLE, PICKLE_PATH) client.execute("echo '1.0' > /var/lib/cloud/data/python-version") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected. Purging cache' in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected. Purging cache" in log _assert_no_pickle_problems(log) @@ -65,11 +66,11 @@ def test_log_message_on_missing_version_file(client: IntegrationInstance): client.execute("rm /var/lib/cloud/data/python-version") client.execute("rm /var/log/cloud-init.log") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - if 'no cache found' not in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "no cache found" not in log: # We don't expect the python version file to exist if we have no # pre-existing cache assert ( - 'Writing python-version file. ' - 'Cache compatibility status is currently unknown.' - ) in log + "Writing python-version file. " + "Cache compatibility status is currently unknown." in log + ) diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py index 1d532fac..1eb7e945 100644 --- a/tests/integration_tests/modules/test_write_files.py +++ b/tests/integration_tests/modules/test_write_files.py @@ -7,8 +7,8 @@ and then checks if those files were created during boot. ``tests/cloud_tests/testcases/modules/write_files.yaml``.)""" import base64 -import pytest +import pytest ASCII_TEXT = "ASCII text" B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8")) @@ -50,25 +50,30 @@ write_files: defer: true owner: 'myuser' permissions: '0644' -""".format(B64_CONTENT.decode("ascii")) +""".format( + B64_CONTENT.decode("ascii") +) @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestWriteFiles: - @pytest.mark.parametrize( - "cmd,expected_out", ( + "cmd,expected_out", + ( ("file /root/file_b64", ASCII_TEXT), ("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"), - ("sha256sum </root/file_binary", ( + ( + "sha256sum </root/file_binary", "2c791c4037ea5bd7e928d6a87380f8ba" - "7a803cd83d5e4f269e28f5090f0f2c9a" - )), - ("file /root/file_gzip", - "POSIX shell script, ASCII text executable"), + "7a803cd83d5e4f269e28f5090f0f2c9a", + ), + ( + "file /root/file_gzip", + "POSIX shell script, ASCII text executable", + ), ("file /root/file_text", ASCII_TEXT), - ) + ), ) def test_write_files(self, cmd, expected_out, class_client): out = class_client.execute(cmd) @@ -82,6 +87,7 @@ class TestWriteFiles: """ out = class_client.read_from_file("/home/testuser/my-file") assert "echo 'hello world!'" == out - assert class_client.execute( - 'stat -c "%U %a" /home/testuser/my-file' - ) == 'myuser 644' + assert ( + class_client.execute('stat -c "%U %a" /home/testuser/my-file') + == "myuser 644" + ) diff --git a/tests/integration_tests/test_upgrade.py b/tests/integration_tests/test_upgrade.py index 0ba4754c..e53ea998 100644 --- a/tests/integration_tests/test_upgrade.py +++ b/tests/integration_tests/test_upgrade.py @@ -1,14 +1,14 @@ import json import logging import os + import pytest from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud from tests.integration_tests.conftest import get_validated_source from tests.integration_tests.util import verify_clean_log - -LOG = logging.getLogger('integration_testing.test_upgrade') +LOG = logging.getLogger("integration_testing.test_upgrade") LOG_TEMPLATE = """\n\ === `systemd-analyze` before: @@ -46,8 +46,10 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud): if not source.installs_new_version(): pytest.skip(UNSUPPORTED_INSTALL_METHOD_MSG.format(source)) return # type checking doesn't understand that skip raises - if (ImageSpecification.from_os_image().release == 'bionic' and - session_cloud.settings.PLATFORM == 'lxd_vm'): + if ( + ImageSpecification.from_os_image().release == "bionic" + and session_cloud.settings.PLATFORM == "lxd_vm" + ): # The issues that we see on Bionic VMs don't appear anywhere # else, including when calling KVM directly. It likely has to # do with the extra lxd-agent setup happening on bionic. @@ -57,32 +59,34 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud): return launch_kwargs = { - 'image_id': session_cloud.released_image_id, + "image_id": session_cloud.released_image_id, } with session_cloud.launch( - launch_kwargs=launch_kwargs, user_data=USER_DATA, + launch_kwargs=launch_kwargs, + user_data=USER_DATA, ) as instance: # get pre values - pre_hostname = instance.execute('hostname') - pre_cloud_id = instance.execute('cloud-id') - pre_result = instance.execute('cat /run/cloud-init/result.json') - pre_network = instance.execute('cat /etc/netplan/50-cloud-init.yaml') - pre_systemd_analyze = instance.execute('systemd-analyze') - pre_systemd_blame = instance.execute('systemd-analyze blame') - pre_cloud_analyze = instance.execute('cloud-init analyze show') - pre_cloud_blame = instance.execute('cloud-init analyze blame') + pre_hostname = instance.execute("hostname") + pre_cloud_id = instance.execute("cloud-id") + pre_result = instance.execute("cat /run/cloud-init/result.json") + pre_network = instance.execute("cat /etc/netplan/50-cloud-init.yaml") + pre_systemd_analyze = instance.execute("systemd-analyze") + pre_systemd_blame = instance.execute("systemd-analyze blame") + pre_cloud_analyze = instance.execute("cloud-init analyze show") + pre_cloud_blame = instance.execute("cloud-init analyze blame") # Ensure no issues pre-upgrade - log = instance.read_from_file('/var/log/cloud-init.log') - assert not json.loads(pre_result)['v1']['errors'] + log = instance.read_from_file("/var/log/cloud-init.log") + assert not json.loads(pre_result)["v1"]["errors"] try: verify_clean_log(log) except AssertionError: LOG.warning( - 'There were errors/warnings/tracebacks pre-upgrade. ' - 'Any failures may be due to pre-upgrade problem') + "There were errors/warnings/tracebacks pre-upgrade. " + "Any failures may be due to pre-upgrade problem" + ) # Upgrade instance.install_new_cloud_init(source, take_snapshot=False) @@ -91,27 +95,27 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud): # have broken across re-constitution of a cached datasource. Some # platforms invalidate their datasource cache on reboot, so we run # it here to ensure we get a dirty run. - assert instance.execute('cloud-init init').ok + assert instance.execute("cloud-init init").ok # Reboot - instance.execute('hostname something-else') + instance.execute("hostname something-else") instance.restart() - assert instance.execute('cloud-init status --wait --long').ok + assert instance.execute("cloud-init status --wait --long").ok # get post values - post_hostname = instance.execute('hostname') - post_cloud_id = instance.execute('cloud-id') - post_result = instance.execute('cat /run/cloud-init/result.json') - post_network = instance.execute('cat /etc/netplan/50-cloud-init.yaml') - post_systemd_analyze = instance.execute('systemd-analyze') - post_systemd_blame = instance.execute('systemd-analyze blame') - post_cloud_analyze = instance.execute('cloud-init analyze show') - post_cloud_blame = instance.execute('cloud-init analyze blame') + post_hostname = instance.execute("hostname") + post_cloud_id = instance.execute("cloud-id") + post_result = instance.execute("cat /run/cloud-init/result.json") + post_network = instance.execute("cat /etc/netplan/50-cloud-init.yaml") + post_systemd_analyze = instance.execute("systemd-analyze") + post_systemd_blame = instance.execute("systemd-analyze blame") + post_cloud_analyze = instance.execute("cloud-init analyze show") + post_cloud_blame = instance.execute("cloud-init analyze blame") # Ensure no issues post-upgrade - assert not json.loads(pre_result)['v1']['errors'] + assert not json.loads(pre_result)["v1"]["errors"] - log = instance.read_from_file('/var/log/cloud-init.log') + log = instance.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) # Ensure important things stayed the same @@ -120,36 +124,46 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud): try: assert pre_result == post_result except AssertionError: - if instance.settings.PLATFORM == 'azure': + if instance.settings.PLATFORM == "azure": pre_json = json.loads(pre_result) post_json = json.loads(post_result) - assert pre_json['v1']['datasource'].startswith( - 'DataSourceAzure') - assert post_json['v1']['datasource'].startswith( - 'DataSourceAzure') + assert pre_json["v1"]["datasource"].startswith( + "DataSourceAzure" + ) + assert post_json["v1"]["datasource"].startswith( + "DataSourceAzure" + ) assert pre_network == post_network # Calculate and log all the boot numbers pre_analyze_totals = [ - x for x in pre_cloud_analyze.splitlines() - if x.startswith('Finished stage') or x.startswith('Total Time') + x + for x in pre_cloud_analyze.splitlines() + if x.startswith("Finished stage") or x.startswith("Total Time") ] post_analyze_totals = [ - x for x in post_cloud_analyze.splitlines() - if x.startswith('Finished stage') or x.startswith('Total Time') + x + for x in post_cloud_analyze.splitlines() + if x.startswith("Finished stage") or x.startswith("Total Time") ] # pylint: disable=logging-format-interpolation - LOG.info(LOG_TEMPLATE.format( - pre_systemd_analyze=pre_systemd_analyze, - post_systemd_analyze=post_systemd_analyze, - pre_systemd_blame='\n'.join(pre_systemd_blame.splitlines()[:10]), - post_systemd_blame='\n'.join(post_systemd_blame.splitlines()[:10]), - pre_analyze_totals='\n'.join(pre_analyze_totals), - post_analyze_totals='\n'.join(post_analyze_totals), - pre_cloud_blame='\n'.join(pre_cloud_blame.splitlines()[:10]), - post_cloud_blame='\n'.join(post_cloud_blame.splitlines()[:10]), - )) + LOG.info( + LOG_TEMPLATE.format( + pre_systemd_analyze=pre_systemd_analyze, + post_systemd_analyze=post_systemd_analyze, + pre_systemd_blame="\n".join( + pre_systemd_blame.splitlines()[:10] + ), + post_systemd_blame="\n".join( + post_systemd_blame.splitlines()[:10] + ), + pre_analyze_totals="\n".join(pre_analyze_totals), + post_analyze_totals="\n".join(post_analyze_totals), + pre_cloud_blame="\n".join(pre_cloud_blame.splitlines()[:10]), + post_cloud_blame="\n".join(post_cloud_blame.splitlines()[:10]), + ) + ) @pytest.mark.ci @@ -157,18 +171,18 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud): def test_subsequent_boot_of_upgraded_package(session_cloud: IntegrationCloud): source = get_validated_source(session_cloud) if not source.installs_new_version(): - if os.environ.get('TRAVIS'): + if os.environ.get("TRAVIS"): # If this isn't running on CI, we should know pytest.fail(UNSUPPORTED_INSTALL_METHOD_MSG.format(source)) else: pytest.skip(UNSUPPORTED_INSTALL_METHOD_MSG.format(source)) return # type checking doesn't understand that skip raises - launch_kwargs = {'image_id': session_cloud.released_image_id} + launch_kwargs = {"image_id": session_cloud.released_image_id} with session_cloud.launch(launch_kwargs=launch_kwargs) as instance: instance.install_new_cloud_init( source, take_snapshot=False, clean=False ) instance.restart() - assert instance.execute('cloud-init status --wait --long').ok + assert instance.execute("cloud-init status --wait --long").ok diff --git a/tests/integration_tests/util.py b/tests/integration_tests/util.py index e40d80fe..31fe69c0 100644 --- a/tests/integration_tests/util.py +++ b/tests/integration_tests/util.py @@ -3,16 +3,15 @@ import logging import multiprocessing import os import time -from contextlib import contextmanager from collections import namedtuple +from contextlib import contextmanager from pathlib import Path +log = logging.getLogger("integration_testing") +key_pair = namedtuple("key_pair", "public_key private_key") -log = logging.getLogger('integration_testing') -key_pair = namedtuple('key_pair', 'public_key private_key') - -ASSETS_DIR = Path('tests/integration_tests/assets') -KEY_PATH = ASSETS_DIR / 'keys' +ASSETS_DIR = Path("tests/integration_tests/assets") +KEY_PATH = ASSETS_DIR / "keys" def verify_ordered_items_in_text(to_verify: list, text: str): @@ -30,26 +29,27 @@ def verify_ordered_items_in_text(to_verify: list, text: str): def verify_clean_log(log): """Assert no unexpected tracebacks or warnings in logs""" - warning_count = log.count('WARN') + warning_count = log.count("WARN") expected_warnings = 0 - traceback_count = log.count('Traceback') + traceback_count = log.count("Traceback") expected_tracebacks = 0 warning_texts = [ # Consistently on all Azure launches: # azure.py[WARNING]: No lease found; using default endpoint - 'No lease found; using default endpoint' + "No lease found; using default endpoint" ] traceback_texts = [] - if 'oracle' in log: + if "oracle" in log: # LP: #1842752 - lease_exists_text = 'Stderr: RTNETLINK answers: File exists' + lease_exists_text = "Stderr: RTNETLINK answers: File exists" warning_texts.append(lease_exists_text) traceback_texts.append(lease_exists_text) # LP: #1833446 fetch_error_text = ( - 'UrlError: 404 Client Error: Not Found for url: ' - 'http://169.254.169.254/latest/meta-data/') + "UrlError: 404 Client Error: Not Found for url: " + "http://169.254.169.254/latest/meta-data/" + ) warning_texts.append(fetch_error_text) traceback_texts.append(fetch_error_text) # Oracle has a file in /etc/cloud/cloud.cfg.d that contains @@ -59,7 +59,7 @@ def verify_clean_log(log): # ssh_redirect_user: true # This can trigger a warning about opc having no public key warning_texts.append( - 'Unable to disable SSH logins for opc given ssh_redirect_user' + "Unable to disable SSH logins for opc given ssh_redirect_user" ) for warning_text in warning_texts: @@ -82,7 +82,7 @@ def emit_dots_on_travis(): It should be wrapped selectively around operations that are known to take a long time. """ - if os.environ.get('TRAVIS') != "true": + if os.environ.get("TRAVIS") != "true": # If we aren't on Travis, don't do anything. yield return @@ -100,9 +100,9 @@ def emit_dots_on_travis(): dot_process.terminate() -def get_test_rsa_keypair(key_name: str = 'test1') -> key_pair: - private_key_path = KEY_PATH / 'id_rsa.{}'.format(key_name) - public_key_path = KEY_PATH / 'id_rsa.{}.pub'.format(key_name) +def get_test_rsa_keypair(key_name: str = "test1") -> key_pair: + private_key_path = KEY_PATH / "id_rsa.{}".format(key_name) + public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name) with public_key_path.open() as public_file: public_key = public_file.read() with private_key_path.open() as private_file: @@ -121,6 +121,7 @@ def retry(*, tries: int = 30, delay: int = 1): def try_something_that_may_not_be_ready(): ... """ + def _retry(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -135,5 +136,7 @@ def retry(*, tries: int = 30, delay: int = 1): else: if last_error: raise last_error + return wrapper + return _retry |