summaryrefslogtreecommitdiff
path: root/tests/integration_tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration_tests')
-rw-r--r--tests/integration_tests/__init__.py8
-rw-r--r--tests/integration_tests/bugs/test_gh570.py13
-rw-r--r--tests/integration_tests/bugs/test_gh626.py25
-rw-r--r--tests/integration_tests/bugs/test_gh632.py20
-rw-r--r--tests/integration_tests/bugs/test_gh668.py15
-rw-r--r--tests/integration_tests/bugs/test_gh671.py35
-rw-r--r--tests/integration_tests/bugs/test_gh868.py3
-rw-r--r--tests/integration_tests/bugs/test_lp1813396.py3
-rw-r--r--tests/integration_tests/bugs/test_lp1835584.py19
-rw-r--r--tests/integration_tests/bugs/test_lp1886531.py2
-rw-r--r--tests/integration_tests/bugs/test_lp1897099.py13
-rw-r--r--tests/integration_tests/bugs/test_lp1898997.py14
-rw-r--r--tests/integration_tests/bugs/test_lp1900837.py2
-rw-r--r--tests/integration_tests/bugs/test_lp1901011.py49
-rw-r--r--tests/integration_tests/bugs/test_lp1910835.py1
-rw-r--r--tests/integration_tests/bugs/test_lp1912844.py4
-rw-r--r--tests/integration_tests/clouds.py163
-rw-r--r--tests/integration_tests/conftest.py130
-rw-r--r--tests/integration_tests/datasources/test_lxd_discovery.py43
-rw-r--r--tests/integration_tests/datasources/test_network_dependency.py17
-rw-r--r--tests/integration_tests/instances.py83
-rw-r--r--tests/integration_tests/integration_settings.py13
-rw-r--r--tests/integration_tests/modules/test_apt.py88
-rw-r--r--tests/integration_tests/modules/test_ca_certs.py1
-rw-r--r--tests/integration_tests/modules/test_cli.py9
-rw-r--r--tests/integration_tests/modules/test_combined.py155
-rw-r--r--tests/integration_tests/modules/test_command_output.py5
-rw-r--r--tests/integration_tests/modules/test_disk_setup.py76
-rw-r--r--tests/integration_tests/modules/test_growpart.py38
-rw-r--r--tests/integration_tests/modules/test_hotplug.py55
-rw-r--r--tests/integration_tests/modules/test_jinja_templating.py11
-rw-r--r--tests/integration_tests/modules/test_keys_to_console.py9
-rw-r--r--tests/integration_tests/modules/test_lxd_bridge.py2
-rw-r--r--tests/integration_tests/modules/test_ntp_servers.py30
-rw-r--r--tests/integration_tests/modules/test_package_update_upgrade_install.py18
-rw-r--r--tests/integration_tests/modules/test_persistence.py26
-rw-r--r--tests/integration_tests/modules/test_power_state_change.py48
-rw-r--r--tests/integration_tests/modules/test_puppet.py6
-rw-r--r--tests/integration_tests/modules/test_set_hostname.py10
-rw-r--r--tests/integration_tests/modules/test_set_password.py15
-rw-r--r--tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py13
-rw-r--r--tests/integration_tests/modules/test_ssh_generate.py16
-rw-r--r--tests/integration_tests/modules/test_ssh_keys_provided.py58
-rw-r--r--tests/integration_tests/modules/test_ssh_keysfile.py159
-rw-r--r--tests/integration_tests/modules/test_user_events.py50
-rw-r--r--tests/integration_tests/modules/test_users_groups.py21
-rw-r--r--tests/integration_tests/modules/test_version_change.py45
-rw-r--r--tests/integration_tests/modules/test_write_files.py32
-rw-r--r--tests/integration_tests/test_upgrade.py120
-rw-r--r--tests/integration_tests/util.py39
50 files changed, 976 insertions, 854 deletions
diff --git a/tests/integration_tests/__init__.py b/tests/integration_tests/__init__.py
index e1d4cd28..81f9b02f 100644
--- a/tests/integration_tests/__init__.py
+++ b/tests/integration_tests/__init__.py
@@ -7,6 +7,8 @@ def random_mac_address() -> str:
The MAC address will have a 1 in its least significant bit, indicating it
to be a locally administered address.
"""
- return "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
- random.randint(0, 255),
- random.randint(0, 255))
+ return "02:00:00:%02x:%02x:%02x" % (
+ random.randint(0, 255),
+ random.randint(0, 255),
+ random.randint(0, 255),
+ )
diff --git a/tests/integration_tests/bugs/test_gh570.py b/tests/integration_tests/bugs/test_gh570.py
index 534cfb9a..ddc74503 100644
--- a/tests/integration_tests/bugs/test_gh570.py
+++ b/tests/integration_tests/bugs/test_gh570.py
@@ -4,9 +4,10 @@ Test that we can add optional vendor-data to the seedfrom file in a
NoCloud environment
"""
-from tests.integration_tests.instances import IntegrationInstance
import pytest
+from tests.integration_tests.instances import IntegrationInstance
+
VENDOR_DATA = """\
#cloud-config
runcmd:
@@ -19,7 +20,7 @@ runcmd:
@pytest.mark.lxd_container
@pytest.mark.lxd_vm
def test_nocloud_seedfrom_vendordata(client: IntegrationInstance):
- seed_dir = '/var/tmp/test_seed_dir'
+ seed_dir = "/var/tmp/test_seed_dir"
result = client.execute(
"mkdir {seed_dir} && "
"touch {seed_dir}/user-data && "
@@ -30,10 +31,10 @@ def test_nocloud_seedfrom_vendordata(client: IntegrationInstance):
assert result.return_code == 0
client.write_to_file(
- '{}/vendor-data'.format(seed_dir),
+ "{}/vendor-data".format(seed_dir),
VENDOR_DATA,
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
- assert client.execute('cloud-init status').ok
- assert 'seeded_vendordata_test_file' in client.execute('ls /var/tmp')
+ assert client.execute("cloud-init status").ok
+ assert "seeded_vendordata_test_file" in client.execute("ls /var/tmp")
diff --git a/tests/integration_tests/bugs/test_gh626.py b/tests/integration_tests/bugs/test_gh626.py
index dba01b34..7c720143 100644
--- a/tests/integration_tests/bugs/test_gh626.py
+++ b/tests/integration_tests/bugs/test_gh626.py
@@ -11,7 +11,6 @@ from tests.integration_tests import random_mac_address
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
-
MAC_ADDRESS = random_mac_address()
NETWORK_CONFIG = """\
version: 2
@@ -21,7 +20,9 @@ ethernets:
wakeonlan: true
match:
macaddress: {}
-""".format(MAC_ADDRESS)
+""".format(
+ MAC_ADDRESS
+)
EXPECTED_ENI_END = """\
iface eth0 inet dhcp
@@ -31,17 +32,19 @@ iface eth0 inet dhcp
@pytest.mark.sru_2020_11
@pytest.mark.lxd_container
@pytest.mark.lxd_vm
-@pytest.mark.lxd_config_dict({
- 'user.network-config': NETWORK_CONFIG,
- "volatile.eth0.hwaddr": MAC_ADDRESS,
-})
+@pytest.mark.lxd_config_dict(
+ {
+ "user.network-config": NETWORK_CONFIG,
+ "volatile.eth0.hwaddr": MAC_ADDRESS,
+ }
+)
def test_wakeonlan(client: IntegrationInstance):
- if ImageSpecification.from_os_image().release == 'xenial':
- eni = client.execute('cat /etc/network/interfaces.d/50-cloud-init.cfg')
+ if ImageSpecification.from_os_image().release == "xenial":
+ eni = client.execute("cat /etc/network/interfaces.d/50-cloud-init.cfg")
assert eni.endswith(EXPECTED_ENI_END)
return
- netplan_cfg = client.execute('cat /etc/netplan/50-cloud-init.yaml')
+ netplan_cfg = client.execute("cat /etc/netplan/50-cloud-init.yaml")
netplan_yaml = yaml.safe_load(netplan_cfg)
- assert 'wakeonlan' in netplan_yaml['network']['ethernets']['eth0']
- assert netplan_yaml['network']['ethernets']['eth0']['wakeonlan'] is True
+ assert "wakeonlan" in netplan_yaml["network"]["ethernets"]["eth0"]
+ assert netplan_yaml["network"]["ethernets"]["eth0"]["wakeonlan"] is True
diff --git a/tests/integration_tests/bugs/test_gh632.py b/tests/integration_tests/bugs/test_gh632.py
index f3702a2e..c7a897c6 100644
--- a/tests/integration_tests/bugs/test_gh632.py
+++ b/tests/integration_tests/bugs/test_gh632.py
@@ -14,18 +14,20 @@ from tests.integration_tests.util import verify_clean_log
@pytest.mark.lxd_vm
def test_datasource_rbx_no_stacktrace(client: IntegrationInstance):
client.write_to_file(
- '/etc/cloud/cloud.cfg.d/90_dpkg.cfg',
- 'datasource_list: [ RbxCloud, NoCloud ]\n',
+ "/etc/cloud/cloud.cfg.d/90_dpkg.cfg",
+ "datasource_list: [ RbxCloud, NoCloud ]\n",
)
client.write_to_file(
- '/etc/cloud/ds-identify.cfg',
- 'policy: enabled\n',
+ "/etc/cloud/ds-identify.cfg",
+ "policy: enabled\n",
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
- assert 'Failed to load metadata and userdata' not in log
- assert ("Getting data from <class 'cloudinit.sources.DataSourceRbxCloud."
- "DataSourceRbxCloud'> failed") not in log
+ assert "Failed to load metadata and userdata" not in log
+ assert (
+ "Getting data from <class 'cloudinit.sources.DataSourceRbxCloud."
+ "DataSourceRbxCloud'> failed" not in log
+ )
diff --git a/tests/integration_tests/bugs/test_gh668.py b/tests/integration_tests/bugs/test_gh668.py
index ce57052e..95edb48d 100644
--- a/tests/integration_tests/bugs/test_gh668.py
+++ b/tests/integration_tests/bugs/test_gh668.py
@@ -10,7 +10,6 @@ import pytest
from tests.integration_tests import random_mac_address
from tests.integration_tests.instances import IntegrationInstance
-
DESTINATION_IP = "172.16.0.10"
GATEWAY_IP = "10.0.0.100"
MAC_ADDRESS = random_mac_address()
@@ -26,17 +25,21 @@ ethernets:
via: {}
match:
macaddress: {}
-""".format(DESTINATION_IP, GATEWAY_IP, MAC_ADDRESS)
+""".format(
+ DESTINATION_IP, GATEWAY_IP, MAC_ADDRESS
+)
EXPECTED_ROUTE = "{} via {}".format(DESTINATION_IP, GATEWAY_IP)
@pytest.mark.lxd_container
@pytest.mark.lxd_vm
-@pytest.mark.lxd_config_dict({
- "user.network-config": NETWORK_CONFIG,
- "volatile.eth0.hwaddr": MAC_ADDRESS,
-})
+@pytest.mark.lxd_config_dict(
+ {
+ "user.network-config": NETWORK_CONFIG,
+ "volatile.eth0.hwaddr": MAC_ADDRESS,
+ }
+)
@pytest.mark.lxd_use_exec
def test_static_route_to_host(client: IntegrationInstance):
route = client.execute("ip route | grep {}".format(DESTINATION_IP))
diff --git a/tests/integration_tests/bugs/test_gh671.py b/tests/integration_tests/bugs/test_gh671.py
index 5e90cdda..15f204ee 100644
--- a/tests/integration_tests/bugs/test_gh671.py
+++ b/tests/integration_tests/bugs/test_gh671.py
@@ -11,13 +11,13 @@ import pytest
from tests.integration_tests.clouds import IntegrationCloud
-OLD_PASSWORD = 'DoIM33tTheComplexityRequirements!??'
-NEW_PASSWORD = 'DoIM33tTheComplexityRequirementsNow!??'
+OLD_PASSWORD = "DoIM33tTheComplexityRequirements!??"
+NEW_PASSWORD = "DoIM33tTheComplexityRequirementsNow!??"
def _check_password(instance, unhashed_password):
- shadow_password = instance.execute('getent shadow ubuntu').split(':')[1]
- salt = shadow_password.rsplit('$', 1)[0]
+ shadow_password = instance.execute("getent shadow ubuntu").split(":")[1]
+ salt = shadow_password.rsplit("$", 1)[0]
hashed_password = crypt.crypt(unhashed_password, salt)
assert shadow_password == hashed_password
@@ -26,29 +26,28 @@ def _check_password(instance, unhashed_password):
@pytest.mark.sru_2020_11
def test_update_default_password(setup_image, session_cloud: IntegrationCloud):
os_profile = {
- 'os_profile': {
- 'admin_password': '',
- 'linux_configuration': {
- 'disable_password_authentication': False
- }
+ "os_profile": {
+ "admin_password": "",
+ "linux_configuration": {"disable_password_authentication": False},
}
}
- os_profile['os_profile']['admin_password'] = OLD_PASSWORD
- instance1 = session_cloud.launch(launch_kwargs={'vm_params': os_profile})
+ os_profile["os_profile"]["admin_password"] = OLD_PASSWORD
+ instance1 = session_cloud.launch(launch_kwargs={"vm_params": os_profile})
_check_password(instance1, OLD_PASSWORD)
snapshot_id = instance1.cloud.cloud_instance.snapshot(
- instance1.instance,
- delete_provisioned_user=False
+ instance1.instance, delete_provisioned_user=False
)
- os_profile['os_profile']['admin_password'] = NEW_PASSWORD
+ os_profile["os_profile"]["admin_password"] = NEW_PASSWORD
try:
- with session_cloud.launch(launch_kwargs={
- 'image_id': snapshot_id,
- 'vm_params': os_profile,
- }) as instance2:
+ with session_cloud.launch(
+ launch_kwargs={
+ "image_id": snapshot_id,
+ "vm_params": os_profile,
+ }
+ ) as instance2:
_check_password(instance2, NEW_PASSWORD)
finally:
session_cloud.cloud_instance.delete_image(snapshot_id)
diff --git a/tests/integration_tests/bugs/test_gh868.py b/tests/integration_tests/bugs/test_gh868.py
index 1119d461..a62e8b36 100644
--- a/tests/integration_tests/bugs/test_gh868.py
+++ b/tests/integration_tests/bugs/test_gh868.py
@@ -4,7 +4,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_clean_log
-
USERDATA = """\
#cloud-config
chef:
@@ -24,5 +23,5 @@ chef:
@pytest.mark.lxd_vm
@pytest.mark.user_data(USERDATA)
def test_chef_license(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
diff --git a/tests/integration_tests/bugs/test_lp1813396.py b/tests/integration_tests/bugs/test_lp1813396.py
index 27d41c2b..451a9972 100644
--- a/tests/integration_tests/bugs/test_lp1813396.py
+++ b/tests/integration_tests/bugs/test_lp1813396.py
@@ -8,7 +8,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_ordered_items_in_text
-
USER_DATA = """\
#cloud-config
apt:
@@ -23,7 +22,7 @@ apt:
@pytest.mark.sru_2020_11
@pytest.mark.user_data(USER_DATA)
def test_gpg_no_tty(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
to_verify = [
"Running command ['gpg', '--no-tty', "
"'--keyserver=keyserver.ubuntu.com', '--recv-keys', 'E4D304DF'] "
diff --git a/tests/integration_tests/bugs/test_lp1835584.py b/tests/integration_tests/bugs/test_lp1835584.py
index 732f2179..a800eab4 100644
--- a/tests/integration_tests/bugs/test_lp1835584.py
+++ b/tests/integration_tests/bugs/test_lp1835584.py
@@ -31,12 +31,9 @@ import re
import pytest
-from tests.integration_tests.instances import IntegrationAzureInstance
-from tests.integration_tests.clouds import (
- ImageSpecification, IntegrationCloud
-)
+from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud
from tests.integration_tests.conftest import get_validated_source
-
+from tests.integration_tests.instances import IntegrationAzureInstance
IMG_AZURE_UBUNTU_PRO_FIPS_BIONIC = (
"Canonical:0001-com-ubuntu-pro-bionic-fips:pro-fips-18_04:18.04.202010201"
@@ -44,14 +41,12 @@ IMG_AZURE_UBUNTU_PRO_FIPS_BIONIC = (
def _check_iid_insensitive_across_kernel_upgrade(
- instance: IntegrationAzureInstance
+ instance: IntegrationAzureInstance,
):
uuid = instance.read_from_file("/sys/class/dmi/id/product_uuid")
- assert uuid.isupper(), (
- "Expected uppercase UUID on Ubuntu FIPS image {}".format(
- uuid
- )
- )
+ assert (
+ uuid.isupper()
+ ), "Expected uppercase UUID on Ubuntu FIPS image {}".format(uuid)
orig_kernel = instance.execute("uname -r").strip()
assert "azure-fips" in orig_kernel
result = instance.execute("apt-get update")
@@ -80,7 +75,7 @@ def _check_iid_insensitive_across_kernel_upgrade(
@pytest.mark.azure
@pytest.mark.sru_next
def test_azure_kernel_upgrade_case_insensitive_uuid(
- session_cloud: IntegrationCloud
+ session_cloud: IntegrationCloud,
):
cfg_image_spec = ImageSpecification.from_os_image()
if (cfg_image_spec.os, cfg_image_spec.release) != ("ubuntu", "bionic"):
diff --git a/tests/integration_tests/bugs/test_lp1886531.py b/tests/integration_tests/bugs/test_lp1886531.py
index 6dd61222..d56ca320 100644
--- a/tests/integration_tests/bugs/test_lp1886531.py
+++ b/tests/integration_tests/bugs/test_lp1886531.py
@@ -13,7 +13,6 @@ import pytest
from tests.integration_tests.util import verify_clean_log
-
USER_DATA = """\
#cloud-config
bootcmd:
@@ -22,7 +21,6 @@ bootcmd:
class TestLp1886531:
-
@pytest.mark.user_data(USER_DATA)
def test_lp1886531(self, client):
log_content = client.read_from_file("/var/log/cloud-init.log")
diff --git a/tests/integration_tests/bugs/test_lp1897099.py b/tests/integration_tests/bugs/test_lp1897099.py
index 27c8927f..876a2887 100644
--- a/tests/integration_tests/bugs/test_lp1897099.py
+++ b/tests/integration_tests/bugs/test_lp1897099.py
@@ -7,7 +7,6 @@ https://bugs.launchpad.net/cloud-init/+bug/1897099
import pytest
-
USER_DATA = """\
#cloud-config
bootcmd:
@@ -21,11 +20,11 @@ swap:
@pytest.mark.sru_2020_11
@pytest.mark.user_data(USER_DATA)
-@pytest.mark.no_container('Containers cannot configure swap')
+@pytest.mark.no_container("Containers cannot configure swap")
def test_fallocate_fallback(client):
- log = client.read_from_file('/var/log/cloud-init.log')
- assert '/swap.img' in client.execute('cat /proc/swaps')
- assert '/swap.img' in client.execute('cat /etc/fstab')
- assert 'fallocate swap creation failed, will attempt with dd' in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "/swap.img" in client.execute("cat /proc/swaps")
+ assert "/swap.img" in client.execute("cat /etc/fstab")
+ assert "fallocate swap creation failed, will attempt with dd" in log
assert "Running command ['dd', 'if=/dev/zero', 'of=/swap.img'" in log
- assert 'SUCCESS: config-mounts ran successfully' in log
+ assert "SUCCESS: config-mounts ran successfully" in log
diff --git a/tests/integration_tests/bugs/test_lp1898997.py b/tests/integration_tests/bugs/test_lp1898997.py
index 909bc690..115bd34f 100644
--- a/tests/integration_tests/bugs/test_lp1898997.py
+++ b/tests/integration_tests/bugs/test_lp1898997.py
@@ -33,13 +33,17 @@ ethernets:
match:
macaddress: {}
version: 2
-""".format(MAC_ADDRESS)
+""".format(
+ MAC_ADDRESS
+)
-@pytest.mark.lxd_config_dict({
- "user.network-config": NETWORK_CONFIG,
- "volatile.eth0.hwaddr": MAC_ADDRESS,
-})
+@pytest.mark.lxd_config_dict(
+ {
+ "user.network-config": NETWORK_CONFIG,
+ "volatile.eth0.hwaddr": MAC_ADDRESS,
+ }
+)
@pytest.mark.lxd_vm
@pytest.mark.lxd_use_exec
@pytest.mark.not_bionic
diff --git a/tests/integration_tests/bugs/test_lp1900837.py b/tests/integration_tests/bugs/test_lp1900837.py
index fcc2b751..3df10883 100644
--- a/tests/integration_tests/bugs/test_lp1900837.py
+++ b/tests/integration_tests/bugs/test_lp1900837.py
@@ -23,7 +23,7 @@ class TestLogPermissionsNotResetOnReboot:
# Reboot
client.restart()
- assert client.execute('cloud-init status').ok
+ assert client.execute("cloud-init status").ok
# Check that permissions are not reset on reboot
assert "600" == _get_log_perms(client)
diff --git a/tests/integration_tests/bugs/test_lp1901011.py b/tests/integration_tests/bugs/test_lp1901011.py
index 2b47f0a8..7de8bd77 100644
--- a/tests/integration_tests/bugs/test_lp1901011.py
+++ b/tests/integration_tests/bugs/test_lp1901011.py
@@ -10,12 +10,16 @@ from tests.integration_tests.clouds import IntegrationCloud
@pytest.mark.azure
-@pytest.mark.parametrize('instance_type,is_ephemeral', [
- ('Standard_DS1_v2', True),
- ('Standard_D2s_v4', False),
-])
-def test_ephemeral(instance_type, is_ephemeral,
- session_cloud: IntegrationCloud, setup_image):
+@pytest.mark.parametrize(
+ "instance_type,is_ephemeral",
+ [
+ ("Standard_DS1_v2", True),
+ ("Standard_D2s_v4", False),
+ ],
+)
+def test_ephemeral(
+ instance_type, is_ephemeral, session_cloud: IntegrationCloud, setup_image
+):
if is_ephemeral:
expected_log = (
"Ephemeral resource disk '/dev/disk/cloud/azure_resource' exists. "
@@ -29,30 +33,35 @@ def test_ephemeral(instance_type, is_ephemeral,
)
with session_cloud.launch(
- launch_kwargs={'instance_type': instance_type}
+ launch_kwargs={"instance_type": instance_type}
) as client:
# Verify log file
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert expected_log in log
# Verify devices
- dev_links = client.execute('ls /dev/disk/cloud')
- assert 'azure_root' in dev_links
- assert 'azure_root-part1' in dev_links
+ dev_links = client.execute("ls /dev/disk/cloud")
+ assert "azure_root" in dev_links
+ assert "azure_root-part1" in dev_links
if is_ephemeral:
- assert 'azure_resource' in dev_links
- assert 'azure_resource-part1' in dev_links
+ assert "azure_resource" in dev_links
+ assert "azure_resource-part1" in dev_links
# Verify mounts
- blks = client.execute('lsblk -pPo NAME,TYPE,MOUNTPOINT')
+ blks = client.execute("lsblk -pPo NAME,TYPE,MOUNTPOINT")
root_device = client.execute(
- 'realpath /dev/disk/cloud/azure_root-part1'
+ "realpath /dev/disk/cloud/azure_root-part1"
+ )
+ assert (
+ 'NAME="{}" TYPE="part" MOUNTPOINT="/"'.format(root_device) in blks
)
- assert 'NAME="{}" TYPE="part" MOUNTPOINT="/"'.format(
- root_device) in blks
if is_ephemeral:
ephemeral_device = client.execute(
- 'realpath /dev/disk/cloud/azure_resource-part1'
+ "realpath /dev/disk/cloud/azure_resource-part1"
+ )
+ assert (
+ 'NAME="{}" TYPE="part" MOUNTPOINT="/mnt"'.format(
+ ephemeral_device
+ )
+ in blks
)
- assert 'NAME="{}" TYPE="part" MOUNTPOINT="/mnt"'.format(
- ephemeral_device) in blks
diff --git a/tests/integration_tests/bugs/test_lp1910835.py b/tests/integration_tests/bugs/test_lp1910835.py
index 87f92d5e..ddd996f9 100644
--- a/tests/integration_tests/bugs/test_lp1910835.py
+++ b/tests/integration_tests/bugs/test_lp1910835.py
@@ -19,7 +19,6 @@ will match.
"""
import pytest
-
USER_DATA_TMPL = """\
#cloud-config
ssh_authorized_keys:
diff --git a/tests/integration_tests/bugs/test_lp1912844.py b/tests/integration_tests/bugs/test_lp1912844.py
index efafae50..55511ed2 100644
--- a/tests/integration_tests/bugs/test_lp1912844.py
+++ b/tests/integration_tests/bugs/test_lp1912844.py
@@ -51,7 +51,9 @@ vlans:
id: 200
link: ovs-br
mtu: 1500
-""".format(MAC_ADDRESS)
+""".format(
+ MAC_ADDRESS
+)
SETUP_USER_DATA = """\
diff --git a/tests/integration_tests/clouds.py b/tests/integration_tests/clouds.py
index dee2adff..d4670bac 100644
--- a/tests/integration_tests/clouds.py
+++ b/tests/integration_tests/clouds.py
@@ -1,17 +1,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from abc import ABC, abstractmethod
import datetime
import logging
import os.path
import random
import string
+from abc import ABC, abstractmethod
from uuid import UUID
from pycloudlib import (
EC2,
GCE,
- Azure,
OCI,
+ Azure,
LXDContainer,
LXDVirtualMachine,
Openstack,
@@ -19,14 +19,15 @@ from pycloudlib import (
from pycloudlib.lxd.instance import LXDInstance
import cloudinit
-from cloudinit.subp import subp, ProcessExecutionError
+from cloudinit.subp import ProcessExecutionError, subp
from tests.integration_tests import integration_settings
from tests.integration_tests.instances import (
+ IntegrationAzureInstance,
IntegrationEc2Instance,
IntegrationGceInstance,
- IntegrationAzureInstance, IntegrationInstance,
- IntegrationOciInstance,
+ IntegrationInstance,
IntegrationLxdInstance,
+ IntegrationOciInstance,
)
from tests.integration_tests.util import emit_dots_on_travis
@@ -36,7 +37,7 @@ except ImportError:
pass
-log = logging.getLogger('integration_testing')
+log = logging.getLogger("integration_testing")
def _get_ubuntu_series() -> list:
@@ -149,41 +150,48 @@ class IntegrationCloud(ABC):
pycloudlib_instance = self.cloud_instance.launch(**launch_kwargs)
return pycloudlib_instance
- def launch(self, user_data=None, launch_kwargs=None,
- settings=integration_settings, **kwargs):
+ def launch(
+ self,
+ user_data=None,
+ launch_kwargs=None,
+ settings=integration_settings,
+ **kwargs
+ ):
if launch_kwargs is None:
launch_kwargs = {}
if self.settings.EXISTING_INSTANCE_ID:
log.info(
- 'Not launching instance due to EXISTING_INSTANCE_ID. '
- 'Instance id: %s', self.settings.EXISTING_INSTANCE_ID)
+ "Not launching instance due to EXISTING_INSTANCE_ID. "
+ "Instance id: %s",
+ self.settings.EXISTING_INSTANCE_ID,
+ )
self.instance = self.cloud_instance.get_instance(
self.settings.EXISTING_INSTANCE_ID
)
return self.instance
default_launch_kwargs = {
- 'image_id': self.image_id,
- 'user_data': user_data,
+ "image_id": self.image_id,
+ "user_data": user_data,
}
launch_kwargs = {**default_launch_kwargs, **launch_kwargs}
log.info(
"Launching instance with launch_kwargs:\n%s",
- "\n".join("{}={}".format(*item) for item in launch_kwargs.items())
+ "\n".join("{}={}".format(*item) for item in launch_kwargs.items()),
)
with emit_dots_on_travis():
pycloudlib_instance = self._perform_launch(launch_kwargs, **kwargs)
- log.info('Launched instance: %s', pycloudlib_instance)
+ log.info("Launched instance: %s", pycloudlib_instance)
instance = self.get_instance(pycloudlib_instance, settings)
- if launch_kwargs.get('wait', True):
+ if launch_kwargs.get("wait", True):
# If we aren't waiting, we can't rely on command execution here
log.info(
- 'cloud-init version: %s',
- instance.execute("cloud-init --version")
+ "cloud-init version: %s",
+ instance.execute("cloud-init --version"),
)
serial = instance.execute("grep serial /etc/cloud/build.info")
if serial:
- log.info('image serial: %s', serial.split()[1])
+ log.info("image serial: %s", serial.split()[1])
return instance
def get_instance(self, cloud_instance, settings=integration_settings):
@@ -199,66 +207,70 @@ class IntegrationCloud(ABC):
if self.snapshot_id:
if self.settings.KEEP_IMAGE:
log.info(
- 'NOT deleting snapshot image created for this testrun '
- 'because KEEP_IMAGE is True: %s', self.snapshot_id)
+ "NOT deleting snapshot image created for this testrun "
+ "because KEEP_IMAGE is True: %s",
+ self.snapshot_id,
+ )
else:
log.info(
- 'Deleting snapshot image created for this testrun: %s',
- self.snapshot_id
+ "Deleting snapshot image created for this testrun: %s",
+ self.snapshot_id,
)
self.cloud_instance.delete_image(self.snapshot_id)
class Ec2Cloud(IntegrationCloud):
- datasource = 'ec2'
+ datasource = "ec2"
integration_instance_cls = IntegrationEc2Instance
def _get_cloud_instance(self):
- return EC2(tag='ec2-integration-test')
+ return EC2(tag="ec2-integration-test")
class GceCloud(IntegrationCloud):
- datasource = 'gce'
+ datasource = "gce"
integration_instance_cls = IntegrationGceInstance
def _get_cloud_instance(self):
return GCE(
- tag='gce-integration-test',
+ tag="gce-integration-test",
)
class AzureCloud(IntegrationCloud):
- datasource = 'azure'
+ datasource = "azure"
integration_instance_cls = IntegrationAzureInstance
def _get_cloud_instance(self):
- return Azure(tag='azure-integration-test')
+ return Azure(tag="azure-integration-test")
def destroy(self):
if self.settings.KEEP_INSTANCE:
log.info(
- 'NOT deleting resource group because KEEP_INSTANCE is true '
- 'and deleting resource group would also delete instance. '
- 'Instance and resource group must both be manually deleted.'
+ "NOT deleting resource group because KEEP_INSTANCE is true "
+ "and deleting resource group would also delete instance. "
+ "Instance and resource group must both be manually deleted."
)
else:
self.cloud_instance.delete_resource_group()
class OciCloud(IntegrationCloud):
- datasource = 'oci'
+ datasource = "oci"
integration_instance_cls = IntegrationOciInstance
def _get_cloud_instance(self):
if not integration_settings.ORACLE_AVAILABILITY_DOMAIN:
raise Exception(
- 'ORACLE_AVAILABILITY_DOMAIN must be set to a valid '
- 'availability domain. If using the oracle CLI, '
- 'try `oci iam availability-domain list`'
+ "ORACLE_AVAILABILITY_DOMAIN must be set to a valid "
+ "availability domain. If using the oracle CLI, "
+ "try `oci iam availability-domain list`"
)
return OCI(
- tag='oci-integration-test',
- availability_domain=integration_settings.ORACLE_AVAILABILITY_DOMAIN
+ tag="oci-integration-test",
+ availability_domain=(
+ integration_settings.ORACLE_AVAILABILITY_DOMAIN,
+ ),
)
@@ -277,38 +289,42 @@ class _LxdIntegrationCloud(IntegrationCloud):
def _mount_source(instance: LXDInstance):
cloudinit_path = cloudinit.__path__[0]
mounts = [
- (cloudinit_path, '/usr/lib/python3/dist-packages/cloudinit'),
- (os.path.join(cloudinit_path, '..', 'config', 'cloud.cfg.d'),
- '/etc/cloud/cloud.cfg.d'),
- (os.path.join(cloudinit_path, '..', 'templates'),
- '/etc/cloud/templates'),
+ (cloudinit_path, "/usr/lib/python3/dist-packages/cloudinit"),
+ (
+ os.path.join(cloudinit_path, "..", "config", "cloud.cfg.d"),
+ "/etc/cloud/cloud.cfg.d",
+ ),
+ (
+ os.path.join(cloudinit_path, "..", "templates"),
+ "/etc/cloud/templates",
+ ),
]
for (n, (source_path, target_path)) in enumerate(mounts):
format_variables = {
- 'name': instance.name,
- 'source_path': os.path.realpath(source_path),
- 'container_path': target_path,
- 'idx': n,
+ "name": instance.name,
+ "source_path": os.path.realpath(source_path),
+ "container_path": target_path,
+ "idx": n,
}
log.info(
- 'Mounting source %(source_path)s directly onto LXD'
- ' container/VM named %(name)s at %(container_path)s',
- format_variables
+ "Mounting source %(source_path)s directly onto LXD"
+ " container/VM named %(name)s at %(container_path)s",
+ format_variables,
)
command = (
- 'lxc config device add {name} host-cloud-init-{idx} disk '
- 'source={source_path} '
- 'path={container_path}'
+ "lxc config device add {name} host-cloud-init-{idx} disk "
+ "source={source_path} "
+ "path={container_path}"
).format(**format_variables)
subp(command.split())
def _perform_launch(self, launch_kwargs, **kwargs):
- launch_kwargs['inst_type'] = launch_kwargs.pop('instance_type', None)
- wait = launch_kwargs.pop('wait', True)
- release = launch_kwargs.pop('image_id')
+ launch_kwargs["inst_type"] = launch_kwargs.pop("instance_type", None)
+ wait = launch_kwargs.pop("wait", True)
+ release = launch_kwargs.pop("image_id")
try:
- profile_list = launch_kwargs['profile_list']
+ profile_list = launch_kwargs["profile_list"]
except KeyError:
profile_list = self._get_or_set_profile_list(release)
@@ -317,52 +333,53 @@ class _LxdIntegrationCloud(IntegrationCloud):
random.choices(string.ascii_lowercase + string.digits, k=8)
)
pycloudlib_instance = self.cloud_instance.init(
- launch_kwargs.pop('name', default_name),
+ launch_kwargs.pop("name", default_name),
release,
profile_list=profile_list,
**launch_kwargs
)
- if self.settings.CLOUD_INIT_SOURCE == 'IN_PLACE':
+ if self.settings.CLOUD_INIT_SOURCE == "IN_PLACE":
self._mount_source(pycloudlib_instance)
- if 'lxd_setup' in kwargs:
+ if "lxd_setup" in kwargs:
log.info("Running callback specified by 'lxd_setup' mark")
- kwargs['lxd_setup'](pycloudlib_instance)
+ kwargs["lxd_setup"](pycloudlib_instance)
pycloudlib_instance.start(wait=wait)
return pycloudlib_instance
class LxdContainerCloud(_LxdIntegrationCloud):
- datasource = 'lxd_container'
+ datasource = "lxd_container"
pycloudlib_instance_cls = LXDContainer
- instance_tag = 'lxd-container-integration-test'
+ instance_tag = "lxd-container-integration-test"
class LxdVmCloud(_LxdIntegrationCloud):
- datasource = 'lxd_vm'
+ datasource = "lxd_vm"
pycloudlib_instance_cls = LXDVirtualMachine
- instance_tag = 'lxd-vm-integration-test'
+ instance_tag = "lxd-vm-integration-test"
_profile_list = None
def _get_or_set_profile_list(self, release):
if self._profile_list:
return self._profile_list
self._profile_list = self.cloud_instance.build_necessary_profiles(
- release)
+ release
+ )
return self._profile_list
class OpenstackCloud(IntegrationCloud):
- datasource = 'openstack'
+ datasource = "openstack"
integration_instance_cls = IntegrationInstance
def _get_cloud_instance(self):
if not integration_settings.OPENSTACK_NETWORK:
raise Exception(
- 'OPENSTACK_NETWORK must be set to a valid Openstack network. '
- 'If using the openstack CLI, try `openstack network list`'
+ "OPENSTACK_NETWORK must be set to a valid Openstack network. "
+ "If using the openstack CLI, try `openstack network list`"
)
return Openstack(
- tag='openstack-integration-test',
+ tag="openstack-integration-test",
network=integration_settings.OPENSTACK_NETWORK,
)
@@ -372,9 +389,9 @@ class OpenstackCloud(IntegrationCloud):
UUID(image.image_id)
except ValueError as e:
raise Exception(
- 'When using Openstack, `OS_IMAGE` MUST be specified with '
- 'a 36-character UUID image ID. Passing in a release name is '
- 'not valid here.\n'
- 'OS image id: {}'.format(image.image_id)
+ "When using Openstack, `OS_IMAGE` MUST be specified with "
+ "a 36-character UUID image ID. Passing in a release name is "
+ "not valid here.\n"
+ "OS image id: {}".format(image.image_id)
) from e
return image.image_id
diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py
index 5eab5a45..b14b6ad0 100644
--- a/tests/integration_tests/conftest.py
+++ b/tests/integration_tests/conftest.py
@@ -2,12 +2,13 @@
import datetime
import functools
import logging
-import pytest
import os
import sys
-from tarfile import TarFile
from contextlib import contextmanager
from pathlib import Path
+from tarfile import TarFile
+
+import pytest
from tests.integration_tests import integration_settings
from tests.integration_tests.clouds import (
@@ -19,31 +20,30 @@ from tests.integration_tests.clouds import (
LxdContainerCloud,
LxdVmCloud,
OciCloud,
- _LxdIntegrationCloud,
OpenstackCloud,
+ _LxdIntegrationCloud,
)
from tests.integration_tests.instances import (
CloudInitSource,
IntegrationInstance,
)
-
-log = logging.getLogger('integration_testing')
+log = logging.getLogger("integration_testing")
log.addHandler(logging.StreamHandler(sys.stdout))
log.setLevel(logging.INFO)
platforms = {
- 'ec2': Ec2Cloud,
- 'gce': GceCloud,
- 'azure': AzureCloud,
- 'oci': OciCloud,
- 'lxd_container': LxdContainerCloud,
- 'lxd_vm': LxdVmCloud,
- 'openstack': OpenstackCloud,
+ "ec2": Ec2Cloud,
+ "gce": GceCloud,
+ "azure": AzureCloud,
+ "oci": OciCloud,
+ "lxd_container": LxdContainerCloud,
+ "lxd_vm": LxdVmCloud,
+ "openstack": OpenstackCloud,
}
os_list = ["ubuntu"]
-session_start_time = datetime.datetime.now().strftime('%y%m%d%H%M%S')
+session_start_time = datetime.datetime.now().strftime("%y%m%d%H%M%S")
XENIAL_LXD_VM_EXEC_MSG = """\
The default xenial images do not support `exec` for LXD VMs.
@@ -69,14 +69,14 @@ def pytest_runtest_setup(item):
test_marks = [mark.name for mark in item.iter_markers()]
supported_platforms = set(all_platforms).intersection(test_marks)
current_platform = integration_settings.PLATFORM
- unsupported_message = 'Cannot run on platform {}'.format(current_platform)
- if 'no_container' in test_marks:
- if 'lxd_container' in test_marks:
+ unsupported_message = "Cannot run on platform {}".format(current_platform)
+ if "no_container" in test_marks:
+ if "lxd_container" in test_marks:
raise Exception(
- 'lxd_container and no_container marks simultaneously set '
- 'on test'
+ "lxd_container and no_container marks simultaneously set "
+ "on test"
)
- if current_platform == 'lxd_container':
+ if current_platform == "lxd_container":
pytest.skip(unsupported_message)
if supported_platforms and current_platform not in supported_platforms:
pytest.skip(unsupported_message)
@@ -86,8 +86,8 @@ def pytest_runtest_setup(item):
supported_os_set = set(os_list).intersection(test_marks)
if current_os and supported_os_set and current_os not in supported_os_set:
pytest.skip("Cannot run on OS {}".format(current_os))
- if 'unstable' in test_marks and not integration_settings.RUN_UNSTABLE:
- pytest.skip('Test marked unstable. Manually remove mark to run it')
+ if "unstable" in test_marks and not integration_settings.RUN_UNSTABLE:
+ pytest.skip("Test marked unstable. Manually remove mark to run it")
current_release = image.release
if "not_{}".format(current_release) in test_marks:
@@ -101,7 +101,7 @@ def disable_subp_usage(request):
pass
-@pytest.yield_fixture(scope='session')
+@pytest.yield_fixture(scope="session")
def session_cloud():
if integration_settings.PLATFORM not in platforms.keys():
raise ValueError(
@@ -122,28 +122,30 @@ def session_cloud():
def get_validated_source(
session_cloud: IntegrationCloud,
- source=integration_settings.CLOUD_INIT_SOURCE
+ source=integration_settings.CLOUD_INIT_SOURCE,
) -> CloudInitSource:
- if source == 'NONE':
+ if source == "NONE":
return CloudInitSource.NONE
- elif source == 'IN_PLACE':
- if session_cloud.datasource not in ['lxd_container', 'lxd_vm']:
+ elif source == "IN_PLACE":
+ if session_cloud.datasource not in ["lxd_container", "lxd_vm"]:
raise ValueError(
- 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD')
+ "IN_PLACE as CLOUD_INIT_SOURCE only works for LXD"
+ )
return CloudInitSource.IN_PLACE
- elif source == 'PROPOSED':
+ elif source == "PROPOSED":
return CloudInitSource.PROPOSED
- elif source.startswith('ppa:'):
+ elif source.startswith("ppa:"):
return CloudInitSource.PPA
elif os.path.isfile(str(source)):
return CloudInitSource.DEB_PACKAGE
elif source == "UPGRADE":
return CloudInitSource.UPGRADE
raise ValueError(
- 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(source))
+ "Invalid value for CLOUD_INIT_SOURCE setting: {}".format(source)
+ )
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def setup_image(session_cloud: IntegrationCloud):
"""Setup the target environment with the correct version of cloud-init.
@@ -153,17 +155,18 @@ def setup_image(session_cloud: IntegrationCloud):
source = get_validated_source(session_cloud)
if not source.installs_new_version():
return
- log.info('Setting up environment for %s', session_cloud.datasource)
+ log.info("Setting up environment for %s", session_cloud.datasource)
client = session_cloud.launch()
client.install_new_cloud_init(source)
# Even if we're keeping instances, we don't want to keep this
# one around as it was just for image creation
client.destroy()
- log.info('Done with environment setup')
+ log.info("Done with environment setup")
-def _collect_logs(instance: IntegrationInstance, node_id: str,
- test_failed: bool):
+def _collect_logs(
+ instance: IntegrationInstance, node_id: str, test_failed: bool
+):
"""Collect logs from remote instance.
Args:
@@ -172,36 +175,43 @@ def _collect_logs(instance: IntegrationInstance, node_id: str,
tests/integration_tests/test_example.py::TestExample.test_example
test_failed: If test failed or not
"""
- if any([
- integration_settings.COLLECT_LOGS == 'NEVER',
- integration_settings.COLLECT_LOGS == 'ON_ERROR' and not test_failed
- ]):
+ if any(
+ [
+ integration_settings.COLLECT_LOGS == "NEVER",
+ integration_settings.COLLECT_LOGS == "ON_ERROR"
+ and not test_failed,
+ ]
+ ):
return
instance.execute(
- 'cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz')
+ "cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz"
+ )
node_id_path = Path(
- node_id
- .replace('.py', '') # Having a directory with '.py' would be weird
- .replace('::', os.path.sep) # Turn classes/tests into paths
- .replace('[', '-') # For parametrized names
- .replace(']', '') # For parameterized names
+ node_id.replace(
+ ".py", ""
+ ) # Having a directory with '.py' would be weird
+ .replace("::", os.path.sep) # Turn classes/tests into paths
+ .replace("[", "-") # For parametrized names
+ .replace("]", "") # For parameterized names
+ )
+ log_dir = (
+ Path(integration_settings.LOCAL_LOG_PATH)
+ / session_start_time
+ / node_id_path
)
- log_dir = Path(
- integration_settings.LOCAL_LOG_PATH
- ) / session_start_time / node_id_path
log.info("Writing logs to %s", log_dir)
if not log_dir.exists():
log_dir.mkdir(parents=True)
# Add a symlink to the latest log output directory
- last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / 'last'
+ last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / "last"
if os.path.islink(last_symlink):
os.unlink(last_symlink)
os.symlink(log_dir.parent, last_symlink)
- tarball_path = log_dir / 'cloud-init.tar.gz'
- instance.pull_file('/var/tmp/cloud-init.tar.gz', tarball_path)
+ tarball_path = log_dir / "cloud-init.tar.gz"
+ instance.pull_file("/var/tmp/cloud-init.tar.gz", tarball_path)
tarball = TarFile.open(str(tarball_path))
tarball.extractall(path=str(log_dir))
@@ -218,12 +228,12 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud):
getter = functools.partial(
fixture_utils.closest_marker_first_arg_or, request, default=None
)
- user_data = getter('user_data')
- name = getter('instance_name')
- lxd_config_dict = getter('lxd_config_dict')
- lxd_setup = getter('lxd_setup')
+ user_data = getter("user_data")
+ name = getter("instance_name")
+ lxd_config_dict = getter("lxd_config_dict")
+ lxd_setup = getter("lxd_setup")
lxd_use_exec = fixture_utils.closest_marker_args_or(
- request, 'lxd_use_exec', None
+ request, "lxd_use_exec", None
)
launch_kwargs = {}
@@ -250,8 +260,8 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud):
local_launch_kwargs = {}
if lxd_setup is not None:
if not isinstance(session_cloud, _LxdIntegrationCloud):
- pytest.skip('lxd_setup requires LXD')
- local_launch_kwargs['lxd_setup'] = lxd_setup
+ pytest.skip("lxd_setup requires LXD")
+ local_launch_kwargs["lxd_setup"] = lxd_setup
with session_cloud.launch(
user_data=user_data, launch_kwargs=launch_kwargs, **local_launch_kwargs
@@ -273,14 +283,14 @@ def client(request, fixture_utils, session_cloud, setup_image):
yield client
-@pytest.yield_fixture(scope='module')
+@pytest.yield_fixture(scope="module")
def module_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per module."""
with _client(request, fixture_utils, session_cloud) as client:
yield client
-@pytest.yield_fixture(scope='class')
+@pytest.yield_fixture(scope="class")
def class_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per class."""
with _client(request, fixture_utils, session_cloud) as client:
diff --git a/tests/integration_tests/datasources/test_lxd_discovery.py b/tests/integration_tests/datasources/test_lxd_discovery.py
index 3f05e906..da010813 100644
--- a/tests/integration_tests/datasources/test_lxd_discovery.py
+++ b/tests/integration_tests/datasources/test_lxd_discovery.py
@@ -1,4 +1,5 @@
import json
+
import pytest
import yaml
@@ -9,10 +10,10 @@ from tests.integration_tests.util import verify_clean_log
def _customize_envionment(client: IntegrationInstance):
client.write_to_file(
- '/etc/cloud/cloud.cfg.d/99-detect-lxd.cfg',
- 'datasource_list: [LXD]\n',
+ "/etc/cloud/cloud.cfg.d/99-detect-lxd.cfg",
+ "datasource_list: [LXD]\n",
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
@@ -25,40 +26,44 @@ def test_lxd_datasource_discovery(client: IntegrationInstance):
"""Test that DataSourceLXD is detected instead of NoCloud."""
_customize_envionment(client)
nic_dev = "enp5s0" if client.settings.PLATFORM == "lxd_vm" else "eth0"
- result = client.execute('cloud-init status --long')
+ result = client.execute("cloud-init status --long")
if not result.ok:
- raise AssertionError('cloud-init failed:\n%s', result.stderr)
+ raise AssertionError("cloud-init failed:\n%s", result.stderr)
if "DataSourceLXD" not in result.stdout:
raise AssertionError(
- 'cloud-init did not discover DataSourceLXD', result.stdout
+ "cloud-init did not discover DataSourceLXD", result.stdout
)
- netplan_yaml = client.execute('cat /etc/netplan/50-cloud-init.yaml')
+ netplan_yaml = client.execute("cat /etc/netplan/50-cloud-init.yaml")
netplan_cfg = yaml.safe_load(netplan_yaml)
assert {
- 'network': {'ethernets': {nic_dev: {'dhcp4': True}}, 'version': 2}
+ "network": {"ethernets": {nic_dev: {"dhcp4": True}}, "version": 2}
} == netplan_cfg
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
- result = client.execute('cloud-id')
+ result = client.execute("cloud-id")
if result.stdout != "lxd":
raise AssertionError(
"cloud-id didn't report lxd. Result: %s", result.stdout
)
# Validate config instance data represented
- data = json.loads(client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ data = json.loads(
+ client.read_from_file("/run/cloud-init/instance-data.json")
)
v1 = data["v1"]
ds_cfg = data["ds"]
assert "lxd" == v1["platform"]
assert "LXD socket API v. 1.0 (/dev/lxd/sock)" == v1["subplatform"]
- ds_cfg = json.loads(client.execute('cloud-init query ds').stdout)
+ ds_cfg = json.loads(client.execute("cloud-init query ds").stdout)
assert ["_doc", "_metadata_api_version", "config", "meta-data"] == sorted(
list(ds_cfg.keys())
)
if (
- client.settings.PLATFORM == "lxd_vm" and
- ImageSpecification.from_os_image().release in ("xenial", "bionic")
+ client.settings.PLATFORM == "lxd_vm"
+ and ImageSpecification.from_os_image().release
+ in (
+ "xenial",
+ "bionic",
+ )
):
# pycloudlib injects user.vendor_data for lxd_vm on bionic and xenial
# to start the lxd-agent.
@@ -74,17 +79,13 @@ def test_lxd_datasource_discovery(client: IntegrationInstance):
assert {"public-keys": v1["public_ssh_keys"][0]} == (
yaml.safe_load(ds_cfg["config"]["user.meta-data"])
)
- assert (
- "#cloud-config\ninstance-id" in ds_cfg["meta-data"]
- )
+ assert "#cloud-config\ninstance-id" in ds_cfg["meta-data"]
# Assert NoCloud seed data is still present in cloud image metadata
# This will start failing if we redact metadata templates from
# https://cloud-images.ubuntu.com/daily/server/jammy/current/\
# jammy-server-cloudimg-amd64-lxd.tar.xz
nocloud_metadata = yaml.safe_load(
- client.read_from_file(
- "/var/lib/cloud/seed/nocloud-net/meta-data"
- )
+ client.read_from_file("/var/lib/cloud/seed/nocloud-net/meta-data")
)
assert client.instance.name == nocloud_metadata["instance-id"]
assert (
diff --git a/tests/integration_tests/datasources/test_network_dependency.py b/tests/integration_tests/datasources/test_network_dependency.py
index 24e71f9d..32ac7053 100644
--- a/tests/integration_tests/datasources/test_network_dependency.py
+++ b/tests/integration_tests/datasources/test_network_dependency.py
@@ -6,10 +6,10 @@ from tests.integration_tests.instances import IntegrationInstance
def _customize_envionment(client: IntegrationInstance):
# Insert our "disable_network_activation" file here
client.write_to_file(
- '/etc/cloud/cloud.cfg.d/99-disable-network-activation.cfg',
- 'disable_network_activation: true\n',
+ "/etc/cloud/cloud.cfg.d/99-disable-network-activation.cfg",
+ "disable_network_activation: true\n",
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
@@ -20,13 +20,14 @@ def _customize_envionment(client: IntegrationInstance):
def test_network_activation_disabled(client: IntegrationInstance):
"""Test that the network is not activated during init mode."""
_customize_envionment(client)
- result = client.execute('systemctl status google-guest-agent.service')
+ result = client.execute("systemctl status google-guest-agent.service")
if not result.ok:
raise AssertionError(
- 'google-guest-agent is not active:\n%s', result.stdout)
- log = client.read_from_file('/var/log/cloud-init.log')
+ "google-guest-agent is not active:\n%s", result.stdout
+ )
+ log = client.read_from_file("/var/log/cloud-init.log")
assert "Running command ['netplan', 'apply']" not in log
- assert 'Not bringing up newly configured network interfaces' in log
- assert 'Bringing up newly configured network interfaces' not in log
+ assert "Not bringing up newly configured network interfaces" in log
+ assert "Bringing up newly configured network interfaces" not in log
diff --git a/tests/integration_tests/instances.py b/tests/integration_tests/instances.py
index 8f66bf43..793f729e 100644
--- a/tests/integration_tests/instances.py
+++ b/tests/integration_tests/instances.py
@@ -1,8 +1,8 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from enum import Enum
import logging
import os
import uuid
+from enum import Enum
from tempfile import NamedTemporaryFile
from pycloudlib.instance import BaseInstance
@@ -13,20 +13,21 @@ from tests.integration_tests.util import retry
try:
from typing import TYPE_CHECKING
+
if TYPE_CHECKING:
from tests.integration_tests.clouds import ( # noqa: F401
- IntegrationCloud
+ IntegrationCloud,
)
except ImportError:
pass
-log = logging.getLogger('integration_testing')
+log = logging.getLogger("integration_testing")
def _get_tmp_path():
tmp_filename = str(uuid.uuid4())
- return '/var/tmp/{}.tmp'.format(tmp_filename)
+ return "/var/tmp/{}.tmp".format(tmp_filename)
class CloudInitSource(Enum):
@@ -37,6 +38,7 @@ class CloudInitSource(Enum):
explanation of these values. If the value set there can't be parsed into
one of these values, an exception will be raised
"""
+
NONE = 1
IN_PLACE = 2
PROPOSED = 3
@@ -51,8 +53,12 @@ class CloudInitSource(Enum):
class IntegrationInstance:
- def __init__(self, cloud: 'IntegrationCloud', instance: BaseInstance,
- settings=integration_settings):
+ def __init__(
+ self,
+ cloud: "IntegrationCloud",
+ instance: BaseInstance,
+ settings=integration_settings,
+ ):
self.cloud = cloud
self.instance = instance
self.settings = settings
@@ -69,41 +75,44 @@ class IntegrationInstance:
self.instance.restart()
def execute(self, command, *, use_sudo=True) -> Result:
- if self.instance.username == 'root' and use_sudo is False:
- raise Exception('Root user cannot run unprivileged')
+ if self.instance.username == "root" and use_sudo is False:
+ raise Exception("Root user cannot run unprivileged")
return self.instance.execute(command, use_sudo=use_sudo)
def pull_file(self, remote_path, local_path):
# First copy to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
- self.instance.execute('cp {} {}'.format(str(remote_path), tmp_path))
+ self.instance.execute("cp {} {}".format(str(remote_path), tmp_path))
self.instance.pull_file(tmp_path, str(local_path))
def push_file(self, local_path, remote_path):
# First push to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
self.instance.push_file(str(local_path), tmp_path)
- self.execute('mv {} {}'.format(tmp_path, str(remote_path)))
+ self.execute("mv {} {}".format(tmp_path, str(remote_path)))
def read_from_file(self, remote_path) -> str:
- result = self.execute('cat {}'.format(remote_path))
+ result = self.execute("cat {}".format(remote_path))
if result.failed:
# TODO: Raise here whatever pycloudlib raises when it has
# a consistent error response
raise IOError(
- 'Failed reading remote file via cat: {}\n'
- 'Return code: {}\n'
- 'Stderr: {}\n'
- 'Stdout: {}'.format(
- remote_path, result.return_code,
- result.stderr, result.stdout)
+ "Failed reading remote file via cat: {}\n"
+ "Return code: {}\n"
+ "Stderr: {}\n"
+ "Stdout: {}".format(
+ remote_path,
+ result.return_code,
+ result.stderr,
+ result.stdout,
+ )
)
return result.stdout
def write_to_file(self, remote_path, contents: str):
# Writes file locally and then pushes it rather
# than writing the file directly on the instance
- with NamedTemporaryFile('w', delete=False) as tmp_file:
+ with NamedTemporaryFile("w", delete=False) as tmp_file:
tmp_file.write(contents)
try:
@@ -113,7 +122,7 @@ class IntegrationInstance:
def snapshot(self):
image_id = self.cloud.snapshot(self.instance)
- log.info('Created new image: %s', image_id)
+ log.info("Created new image: %s", image_id)
return image_id
def install_new_cloud_init(
@@ -133,10 +142,11 @@ class IntegrationInstance:
else:
raise Exception(
"Specified to install {} which isn't supported here".format(
- source)
+ source
+ )
)
- version = self.execute('cloud-init -v').split()[-1]
- log.info('Installed cloud-init version: %s', version)
+ version = self.execute("cloud-init -v").split()[-1]
+ log.info("Installed cloud-init version: %s", version)
if clean:
self.instance.clean()
if take_snapshot:
@@ -149,38 +159,39 @@ class IntegrationInstance:
@retry(tries=30, delay=1)
def install_proposed_image(self):
- log.info('Installing proposed image')
+ log.info("Installing proposed image")
assert self.execute(
'echo deb "http://archive.ubuntu.com/ubuntu '
'$(lsb_release -sc)-proposed main" >> '
- '/etc/apt/sources.list.d/proposed.list'
+ "/etc/apt/sources.list.d/proposed.list"
).ok
- assert self.execute('apt-get update -q').ok
- assert self.execute('apt-get install -qy cloud-init').ok
+ assert self.execute("apt-get update -q").ok
+ assert self.execute("apt-get install -qy cloud-init").ok
@retry(tries=30, delay=1)
def install_ppa(self):
- log.info('Installing PPA')
- assert self.execute('add-apt-repository {} -y'.format(
- self.settings.CLOUD_INIT_SOURCE)
+ log.info("Installing PPA")
+ assert self.execute(
+ "add-apt-repository {} -y".format(self.settings.CLOUD_INIT_SOURCE)
).ok
- assert self.execute('apt-get update -q').ok
- assert self.execute('apt-get install -qy cloud-init').ok
+ assert self.execute("apt-get update -q").ok
+ assert self.execute("apt-get install -qy cloud-init").ok
@retry(tries=30, delay=1)
def install_deb(self):
- log.info('Installing deb package')
+ log.info("Installing deb package")
deb_path = integration_settings.CLOUD_INIT_SOURCE
deb_name = os.path.basename(deb_path)
- remote_path = '/var/tmp/{}'.format(deb_name)
+ remote_path = "/var/tmp/{}".format(deb_name)
self.push_file(
local_path=integration_settings.CLOUD_INIT_SOURCE,
- remote_path=remote_path)
- assert self.execute('dpkg -i {path}'.format(path=remote_path)).ok
+ remote_path=remote_path,
+ )
+ assert self.execute("dpkg -i {path}".format(path=remote_path)).ok
@retry(tries=30, delay=1)
def upgrade_cloud_init(self):
- log.info('Upgrading cloud-init to latest version in archive')
+ log.info("Upgrading cloud-init to latest version in archive")
assert self.execute("apt-get update -q").ok
assert self.execute("apt-get install -qy cloud-init").ok
diff --git a/tests/integration_tests/integration_settings.py b/tests/integration_tests/integration_settings.py
index e4a790c2..641ce297 100644
--- a/tests/integration_tests/integration_settings.py
+++ b/tests/integration_tests/integration_settings.py
@@ -1,6 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
-
from distutils.util import strtobool
##################################################################
@@ -22,7 +21,7 @@ RUN_UNSTABLE = False
# gce
# oci
# openstack
-PLATFORM = 'lxd_container'
+PLATFORM = "lxd_container"
# The cloud-specific instance type to run. E.g., a1.medium on AWS
# If the pycloudlib instance provides a default, this can be left None
@@ -34,7 +33,7 @@ INSTANCE_TYPE = None
# <image_id>[::<os>[::<release>]]. If given, os and release should describe
# the image specified by image_id. (Ubuntu releases are converted to this
# format internally; in this case, to "focal::ubuntu::focal".)
-OS_IMAGE = 'focal'
+OS_IMAGE = "focal"
# Populate if you want to use a pre-launched instance instead of
# creating a new one. The exact contents will be platform dependent
@@ -66,7 +65,7 @@ EXISTING_INSTANCE_ID = None
# Install from a PPA. It MUST start with 'ppa:'
# <file path>
# A path to a valid package to be uploaded and installed
-CLOUD_INIT_SOURCE = 'NONE'
+CLOUD_INIT_SOURCE = "NONE"
# Before an instance is torn down, we run `cloud-init collect-logs`
# and transfer them locally. These settings specify when to collect these
@@ -75,8 +74,8 @@ CLOUD_INIT_SOURCE = 'NONE'
# 'ALWAYS'
# 'ON_ERROR'
# 'NEVER'
-COLLECT_LOGS = 'ON_ERROR'
-LOCAL_LOG_PATH = '/tmp/cloud_init_test_logs'
+COLLECT_LOGS = "ON_ERROR"
+LOCAL_LOG_PATH = "/tmp/cloud_init_test_logs"
##################################################################
# SSH KEY SETTINGS
@@ -124,7 +123,7 @@ except ImportError:
current_settings = [var for var in locals() if var.isupper()]
for setting in current_settings:
env_setting = os.getenv(
- 'CLOUD_INIT_{}'.format(setting), globals()[setting]
+ "CLOUD_INIT_{}".format(setting), globals()[setting]
)
if isinstance(env_setting, str):
try:
diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py
index f5f6c813..48f398d1 100644
--- a/tests/integration_tests/modules/test_apt.py
+++ b/tests/integration_tests/modules/test_apt.py
@@ -3,12 +3,11 @@ import re
import pytest
-from cloudinit.config import cc_apt_configure
from cloudinit import gpg
+from cloudinit.config import cc_apt_configure
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
apt:
@@ -104,14 +103,15 @@ class TestApt:
"""Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg
in human readable format. Mimics the output of apt-key finger
"""
- list_cmd = ' '.join(gpg.GPG_LIST) + ' '
+ list_cmd = " ".join(gpg.GPG_LIST) + " "
keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS)
print(keys)
files = class_client.execute(
- 'ls ' + cc_apt_configure.APT_TRUSTED_GPG_DIR)
+ "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR
+ )
for file in files.split():
path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file
- keys += class_client.execute(list_cmd + path) or ''
+ keys += class_client.execute(list_cmd + path) or ""
return keys
def test_sources_list(self, class_client: IntegrationInstance):
@@ -124,8 +124,8 @@ class TestApt:
(This is ported from
`tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.)
"""
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
- assert 6 == len(sources_list.rstrip().split('\n'))
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert 6 == len(sources_list.rstrip().split("\n"))
for expected_re in EXPECTED_REGEXES:
assert re.search(expected_re, sources_list) is not None
@@ -136,7 +136,7 @@ class TestApt:
Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py
"""
apt_config = class_client.read_from_file(
- '/etc/apt/apt.conf.d/94cloud-init-config'
+ "/etc/apt/apt.conf.d/94cloud-init-config"
)
assert 'Assume-Yes "true";' in apt_config
assert 'Fix-Broken "true";' in apt_config
@@ -149,40 +149,43 @@ class TestApt:
"""
release = ImageSpecification.from_os_image().release
ppa_path_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/'
- 'simplestreams-dev-ubuntu-trunk-{}.list'.format(release)
+ "/etc/apt/sources.list.d/"
+ "simplestreams-dev-ubuntu-trunk-{}.list".format(release)
)
assert (
- 'http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu'
- ) in ppa_path_contents
+ "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu"
+ in ppa_path_contents
+ )
assert TEST_PPA_KEY in self.get_keys(class_client)
def test_signed_by(self, class_client: IntegrationInstance):
- """Test the apt signed-by functionality.
- """
+ """Test the apt signed-by functionality."""
release = ImageSpecification.from_os_image().release
source = (
"deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] "
"http://ppa.launchpad.net/juju/stable/ubuntu"
- " {} main".format(release))
+ " {} main".format(release)
+ )
path_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_signed_by.list')
+ "/etc/apt/sources.list.d/test_signed_by.list"
+ )
assert path_contents == source
key = class_client.execute(
- 'gpg --no-default-keyring --with-fingerprint --list-keys '
- '--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg')
+ "gpg --no-default-keyring --with-fingerprint --list-keys "
+ "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg"
+ )
assert TEST_SIGNED_BY_KEY in key
def test_bad_key(self, class_client: IntegrationInstance):
- """Test the apt signed-by functionality.
- """
+ """Test the apt signed-by functionality."""
with pytest.raises(OSError):
class_client.read_from_file(
- '/etc/apt/trusted.list.d/test_bad_key.gpg')
+ "/etc/apt/trusted.list.d/test_bad_key.gpg"
+ )
def test_key(self, class_client: IntegrationInstance):
"""Test the apt key functionality.
@@ -191,12 +194,13 @@ class TestApt:
tests/cloud_tests/testcases/modules/apt_configure_sources_key.py
"""
test_archive_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_key.list'
+ "/etc/apt/sources.list.d/test_key.list"
)
assert (
- 'http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu'
- ) in test_archive_contents
+ "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu"
+ in test_archive_contents
+ )
assert TEST_KEY in self.get_keys(class_client)
def test_keyserver(self, class_client: IntegrationInstance):
@@ -206,12 +210,13 @@ class TestApt:
tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py
"""
test_keyserver_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_keyserver.list'
+ "/etc/apt/sources.list.d/test_keyserver.list"
)
assert (
- 'http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu'
- ) in test_keyserver_contents
+ "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu"
+ in test_keyserver_contents
+ )
assert TEST_KEYSERVER_KEY in self.get_keys(class_client)
@@ -221,7 +226,7 @@ class TestApt:
Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py
"""
conf_exists = class_client.execute(
- 'test -f /etc/apt/apt.conf.d/90cloud-init-pipelining'
+ "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining"
).ok
assert conf_exists is False
@@ -237,7 +242,7 @@ apt:
- arches:
- default
"""
-DEFAULT_DATA = _DEFAULT_DATA.format(uri='')
+DEFAULT_DATA = _DEFAULT_DATA.format(uri="")
@pytest.mark.ubuntu
@@ -249,9 +254,9 @@ class TestDefaults:
When no uri is provided.
"""
- zone = class_client.execute('cloud-init query v1.availability_zone')
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
- assert '{}.clouds.archive.ubuntu.com'.format(zone) in sources_list
+ zone = class_client.execute("cloud-init query v1.availability_zone")
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list
def test_security(self, class_client: IntegrationInstance):
"""Test apt default security sources.
@@ -259,12 +264,12 @@ class TestDefaults:
Ported from
tests/cloud_tests/testcases/modules/apt_configure_security.py
"""
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
# 3 lines from main, universe, and multiverse
- assert 3 == sources_list.count('deb http://security.ubuntu.com/ubuntu')
+ assert 3 == sources_list.count("deb http://security.ubuntu.com/ubuntu")
assert 3 == sources_list.count(
- '# deb-src http://security.ubuntu.com/ubuntu'
+ "# deb-src http://security.ubuntu.com/ubuntu"
)
@@ -280,10 +285,10 @@ def test_default_primary_with_uri(client: IntegrationInstance):
Ported from
tests/cloud_tests/testcases/modules/apt_configure_primary.py
"""
- sources_list = client.read_from_file('/etc/apt/sources.list')
- assert 'archive.ubuntu.com' not in sources_list
+ sources_list = client.read_from_file("/etc/apt/sources.list")
+ assert "archive.ubuntu.com" not in sources_list
- assert 'something.random.invalid' in sources_list
+ assert "something.random.invalid" in sources_list
DISABLED_DATA = """\
@@ -310,7 +315,7 @@ class TestDisabled:
sources_list = class_client.execute(
"cat /etc/apt/sources.list | grep -v '^#'"
).strip()
- assert '' == sources_list
+ assert "" == sources_list
def test_disable_apt_pipelining(self, class_client: IntegrationInstance):
"""Test disabling of apt pipelining.
@@ -319,7 +324,7 @@ class TestDisabled:
tests/cloud_tests/testcases/modules/apt_pipelining_disable.py
"""
conf = class_client.read_from_file(
- '/etc/apt/apt.conf.d/90cloud-init-pipelining'
+ "/etc/apt/apt.conf.d/90cloud-init-pipelining"
)
assert 'Acquire::http::Pipeline-Depth "0";' in conf
@@ -338,8 +343,7 @@ apt:
@pytest.mark.user_data(APT_PROXY_DATA)
def test_apt_proxy(client: IntegrationInstance):
"""Test the apt proxy data gets written correctly."""
- out = client.read_from_file(
- '/etc/apt/apt.conf.d/90cloud-init-aptproxy')
+ out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy")
assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out
assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out
assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out
diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py
index 89c01a9c..d514fc62 100644
--- a/tests/integration_tests/modules/test_ca_certs.py
+++ b/tests/integration_tests/modules/test_ca_certs.py
@@ -10,7 +10,6 @@ import os.path
import pytest
-
USER_DATA = """\
#cloud-config
ca-certs:
diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py
index 3f41b34d..97bfe52d 100644
--- a/tests/integration_tests/modules/test_cli.py
+++ b/tests/integration_tests/modules/test_cli.py
@@ -7,7 +7,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
-
VALID_USER_DATA = """\
#cloud-config
runcmd:
@@ -27,9 +26,9 @@ def test_valid_userdata(client: IntegrationInstance):
PR #575
"""
- result = client.execute('cloud-init devel schema --system')
+ result = client.execute("cloud-init devel schema --system")
assert result.ok
- assert 'Valid cloud-config: system userdata' == result.stdout.strip()
+ assert "Valid cloud-config: system userdata" == result.stdout.strip()
@pytest.mark.sru_2020_11
@@ -39,7 +38,7 @@ def test_invalid_userdata(client: IntegrationInstance):
PR #575
"""
- result = client.execute('cloud-init devel schema --system')
+ result = client.execute("cloud-init devel schema --system")
assert not result.ok
- assert 'Cloud config schema errors' in result.stderr
+ assert "Cloud config schema errors" in result.stderr
assert 'needs to begin with "#cloud-config"' in result.stderr
diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py
index 26a8397d..c88f40d3 100644
--- a/tests/integration_tests/modules/test_combined.py
+++ b/tests/integration_tests/modules/test_combined.py
@@ -6,9 +6,10 @@ the same instance launch. Most independent module coherence tests can go
here.
"""
import json
-import pytest
import re
+import pytest
+
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import (
@@ -76,7 +77,7 @@ class TestCombined:
Also tests LP 1511485: final_message is silent.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
expected = (
"This is my final message!\n"
r"\d+\.\d+.*\n"
@@ -94,10 +95,10 @@ class TestCombined:
configuring the archives.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'W: Failed to fetch' not in log
- assert 'W: Some index files failed to download' not in log
- assert 'E: Unable to locate package ntp' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "W: Failed to fetch" not in log
+ assert "W: Some index files failed to download" not in log
+ assert "E: Unable to locate package ntp" not in log
def test_byobu(self, class_client: IntegrationInstance):
"""Test byobu configured as enabled by default."""
@@ -107,22 +108,18 @@ class TestCombined:
def test_configured_locale(self, class_client: IntegrationInstance):
"""Test locale can be configured correctly."""
client = class_client
- default_locale = client.read_from_file('/etc/default/locale')
- assert 'LANG=en_GB.UTF-8' in default_locale
+ default_locale = client.read_from_file("/etc/default/locale")
+ assert "LANG=en_GB.UTF-8" in default_locale
- locale_a = client.execute('locale -a')
- verify_ordered_items_in_text([
- 'en_GB.utf8',
- 'en_US.utf8'
- ], locale_a)
+ locale_a = client.execute("locale -a")
+ verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a)
locale_gen = client.execute(
"cat /etc/locale.gen | grep -v '^#' | uniq"
)
- verify_ordered_items_in_text([
- 'en_GB.UTF-8',
- 'en_US.UTF-8'
- ], locale_gen)
+ verify_ordered_items_in_text(
+ ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen
+ )
def test_random_seed_data(self, class_client: IntegrationInstance):
"""Integration test for the random seed module.
@@ -141,12 +138,12 @@ class TestCombined:
def test_rsyslog(self, class_client: IntegrationInstance):
"""Test rsyslog is configured correctly."""
client = class_client
- assert 'My test log' in client.read_from_file('/var/tmp/rsyslog.log')
+ assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log")
def test_runcmd(self, class_client: IntegrationInstance):
"""Test runcmd works as expected"""
client = class_client
- assert 'hello world' == client.read_from_file('/var/tmp/runcmd_output')
+ assert "hello world" == client.read_from_file("/var/tmp/runcmd_output")
@retry(tries=30, delay=1)
def test_ssh_import_id(self, class_client: IntegrationInstance):
@@ -160,11 +157,10 @@ class TestCombined:
/home/ubuntu; this will need modification to run on other OSes.
"""
client = class_client
- ssh_output = client.read_from_file(
- "/home/ubuntu/.ssh/authorized_keys")
+ ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys")
- assert '# ssh-import-id gh:powersj' in ssh_output
- assert '# ssh-import-id lp:smoser' in ssh_output
+ assert "# ssh-import-id gh:powersj" in ssh_output
+ assert "# ssh-import-id lp:smoser" in ssh_output
def test_snap(self, class_client: IntegrationInstance):
"""Integration test for the snap module.
@@ -185,21 +181,22 @@ class TestCombined:
"""
client = class_client
timezone_output = client.execute(
- 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"')
+ 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"'
+ )
assert timezone_output.strip() == "HDT"
def test_no_problems(self, class_client: IntegrationInstance):
"""Test no errors, warnings, or tracebacks"""
client = class_client
- status_file = client.read_from_file('/run/cloud-init/status.json')
- status_json = json.loads(status_file)['v1']
- for stage in ('init', 'init-local', 'modules-config', 'modules-final'):
- assert status_json[stage]['errors'] == []
- result_file = client.read_from_file('/run/cloud-init/result.json')
- result_json = json.loads(result_file)['v1']
- assert result_json['errors'] == []
-
- log = client.read_from_file('/var/log/cloud-init.log')
+ status_file = client.read_from_file("/run/cloud-init/status.json")
+ status_json = json.loads(status_file)["v1"]
+ for stage in ("init", "init-local", "modules-config", "modules-final"):
+ assert status_json[stage]["errors"] == []
+ result_file = client.read_from_file("/run/cloud-init/result.json")
+ result_json = json.loads(result_file)["v1"]
+ assert result_json["errors"] == []
+
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
def test_correct_datasource_detected(
@@ -228,73 +225,81 @@ class TestCombined:
)
def _check_common_metadata(self, data):
- assert data['base64_encoded_keys'] == []
- assert data['merged_cfg'] == 'redacted for non-root user'
+ assert data["base64_encoded_keys"] == []
+ assert data["merged_cfg"] == "redacted for non-root user"
image_spec = ImageSpecification.from_os_image()
- assert data['sys_info']['dist'][0] == image_spec.os
+ assert data["sys_info"]["dist"][0] == image_spec.os
- v1_data = data['v1']
- assert re.match(r'\d\.\d+\.\d+-\d+', v1_data['kernel_release'])
- assert v1_data['variant'] == image_spec.os
- assert v1_data['distro'] == image_spec.os
- assert v1_data['distro_release'] == image_spec.release
- assert v1_data['machine'] == 'x86_64'
- assert re.match(r'3.\d\.\d', v1_data['python_version'])
+ v1_data = data["v1"]
+ assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"])
+ assert v1_data["variant"] == image_spec.os
+ assert v1_data["distro"] == image_spec.os
+ assert v1_data["distro_release"] == image_spec.release
+ assert v1_data["machine"] == "x86_64"
+ assert re.match(r"3.\d\.\d", v1_data["python_version"])
@pytest.mark.lxd_container
def test_instance_json_lxd(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert v1_data['subplatform'] == (
- 'seed-dir (/var/lib/cloud/seed/nocloud-net)')
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert (
+ v1_data["subplatform"]
+ == "seed-dir (/var/lib/cloud/seed/nocloud-net)"
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.lxd_vm
def test_instance_json_lxd_vm(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert any([
- '/var/lib/cloud/seed/nocloud-net' in v1_data['subplatform'],
- '/dev/sr0' in v1_data['subplatform']
- ])
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert any(
+ [
+ "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"],
+ "/dev/sr0" in v1_data["subplatform"],
+ ]
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.ec2
def test_instance_json_ec2(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'aws'
- assert v1_data['platform'] == 'ec2'
- assert v1_data['subplatform'].startswith('metadata')
- assert v1_data[
- 'availability_zone'] == client.instance.availability_zone
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'].startswith('ip-')
- assert v1_data['region'] == client.cloud.cloud_instance.region
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "aws"
+ assert v1_data["platform"] == "ec2"
+ assert v1_data["subplatform"].startswith("metadata")
+ assert (
+ v1_data["availability_zone"] == client.instance.availability_zone
+ )
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"].startswith("ip-")
+ assert v1_data["region"] == client.cloud.cloud_instance.region
@pytest.mark.gce
def test_instance_json_gce(self, class_client: IntegrationInstance):
diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py
index 8429873f..96525cac 100644
--- a/tests/integration_tests/modules/test_command_output.py
+++ b/tests/integration_tests/modules/test_command_output.py
@@ -8,7 +8,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
output: { all: "| tee -a /var/log/cloud-init-test-output" }
@@ -18,5 +17,5 @@ final_message: "should be last line in cloud-init-test-output file"
@pytest.mark.user_data(USER_DATA)
def test_runcmd(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init-test-output')
- assert 'should be last line in cloud-init-test-output file' in log
+ log = client.read_from_file("/var/log/cloud-init-test-output")
+ assert "should be last line in cloud-init-test-output file" in log
diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py
index 9c9edc46..22277331 100644
--- a/tests/integration_tests/modules/test_disk_setup.py
+++ b/tests/integration_tests/modules/test_disk_setup.py
@@ -1,25 +1,29 @@
import json
import os
-import pytest
from uuid import uuid4
+
+import pytest
from pycloudlib.lxd.instance import LXDInstance
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_clean_log
-DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4())
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
def setup_and_mount_lxd_disk(instance: LXDInstance):
- subp('lxc config device add {} test-disk-setup-disk disk source={}'.format(
- instance.name, DISK_PATH).split())
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
@pytest.yield_fixture
def create_disk():
# 640k should be enough for anybody
- subp('dd if=/dev/zero of={} bs=1k count=640'.format(DISK_PATH).split())
+ subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split())
yield
os.remove(DISK_PATH)
@@ -54,21 +58,21 @@ class TestDeviceAliases:
"""Test devices aliases work on disk setup/mount"""
def test_device_alias(self, create_disk, client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert (
- "updated disk_setup device entry 'my_alias' to '/dev/sdb'"
- ) in log
- assert 'changed my_alias.1 => /dev/sdb1' in log
- assert 'changed my_alias.2 => /dev/sdb2' in log
+ "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log
+ )
+ assert "changed my_alias.1 => /dev/sdb1" in log
+ assert "changed my_alias.2 => /dev/sdb2" in log
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 2
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt1'
- assert sdb['children'][1]['name'] == 'sdb2'
- assert sdb['children'][1]['mountpoint'] == '/mnt2'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
PARTPROBE_USERDATA = """\
@@ -121,13 +125,13 @@ class TestPartProbeAvailability:
def _verify_first_disk_setup(self, client, log):
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 2
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt1'
- assert sdb['children'][1]['name'] == 'sdb2'
- assert sdb['children'][1]['mountpoint'] == '/mnt2'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
# Not bionic or xenial because the LXD agent gets in the way of us
# changing the userdata
@@ -148,13 +152,13 @@ class TestPartProbeAvailability:
with a warning and a traceback. When partprobe is in use, everything
should work successfully.
"""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
self._verify_first_disk_setup(client, log)
# Update our userdata and cloud.cfg to mount then perform new disk
# setup
client.write_to_file(
- '/var/lib/cloud/seed/nocloud-net/user-data',
+ "/var/lib/cloud/seed/nocloud-net/user-data",
UPDATED_PARTPROBE_USERDATA,
)
client.execute(
@@ -162,17 +166,17 @@ class TestPartProbeAvailability:
"/etc/cloud/cloud.cfg"
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
# Assert new setup works as expected
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 1
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt3'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt3"
def test_disk_setup_no_partprobe(
self, create_disk, client: IntegrationInstance
@@ -180,11 +184,11 @@ class TestPartProbeAvailability:
"""Ensure disk setup still works as expected without partprobe."""
# We can't do this part in a bootcmd because the path has already
# been found by the time we get to the bootcmd
- client.execute('rm $(which partprobe)')
- client.execute('cloud-init clean --logs')
+ client.execute("rm $(which partprobe)")
+ client.execute("cloud-init clean --logs")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
self._verify_first_disk_setup(client, log)
- assert 'partprobe' not in log
+ assert "partprobe" not in log
diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py
index af1e3a15..67251817 100644
--- a/tests/integration_tests/modules/test_growpart.py
+++ b/tests/integration_tests/modules/test_growpart.py
@@ -1,22 +1,26 @@
+import json
import os
-import pytest
import pathlib
-import json
from uuid import uuid4
+
+import pytest
from pycloudlib.lxd.instance import LXDInstance
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
-DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4())
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
def setup_and_mount_lxd_disk(instance: LXDInstance):
- subp('lxc config device add {} test-disk-setup-disk disk source={}'.format(
- instance.name, DISK_PATH).split())
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
-@pytest.fixture(scope='class', autouse=True)
+@pytest.fixture(scope="class", autouse=True)
def create_disk():
"""Create 16M sparse file"""
pathlib.Path(DISK_PATH).touch()
@@ -50,13 +54,15 @@ class TestGrowPart:
"""Test growpart"""
def test_grow_part(self, client: IntegrationInstance):
- """Verify """
- log = client.read_from_file('/var/log/cloud-init.log')
- assert ("cc_growpart.py[INFO]: '/dev/sdb1' resized:"
- " changed (/dev/sdb, 1) from") in log
-
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 1
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['size'] == '16M'
+ """Verify"""
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert (
+ "cc_growpart.py[INFO]: '/dev/sdb1' resized:"
+ " changed (/dev/sdb, 1) from" in log
+ )
+
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["size"] == "16M"
diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py
index f5abc86f..0bad761e 100644
--- a/tests/integration_tests/modules/test_hotplug.py
+++ b/tests/integration_tests/modules/test_hotplug.py
@@ -1,8 +1,9 @@
-import pytest
import time
-import yaml
from collections import namedtuple
+import pytest
+import yaml
+
from tests.integration_tests.instances import IntegrationInstance
USER_DATA = """\
@@ -12,28 +13,28 @@ updates:
when: ['hotplug']
"""
-ip_addr = namedtuple('ip_addr', 'interface state ip4 ip6')
+ip_addr = namedtuple("ip_addr", "interface state ip4 ip6")
def _wait_till_hotplug_complete(client, expected_runs=1):
for _ in range(60):
- log = client.read_from_file('/var/log/cloud-init.log')
- if log.count('Exiting hotplug handler') == expected_runs:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if log.count("Exiting hotplug handler") == expected_runs:
return log
time.sleep(1)
- raise Exception('Waiting for hotplug handler failed')
+ raise Exception("Waiting for hotplug handler failed")
def _get_ip_addr(client):
ips = []
- lines = client.execute('ip --brief addr').split('\n')
+ lines = client.execute("ip --brief addr").split("\n")
for line in lines:
attributes = line.split()
interface, state = attributes[0], attributes[1]
ip4_cidr = attributes[2] if len(attributes) > 2 else None
ip6_cidr = attributes[3] if len(attributes) > 3 else None
- ip4 = ip4_cidr.split('/')[0] if ip4_cidr else None
- ip6 = ip6_cidr.split('/')[0] if ip6_cidr else None
+ ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None
+ ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None
ip = ip_addr(interface, state, ip4, ip6)
ips.append(ip)
return ips
@@ -47,10 +48,10 @@ def _get_ip_addr(client):
@pytest.mark.user_data(USER_DATA)
def test_hotplug_add_remove(client: IntegrationInstance):
ips_before = _get_ip_addr(client)
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Exiting hotplug handler' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
assert client.execute(
- 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules'
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
).ok
# Add new NIC
@@ -62,11 +63,11 @@ def test_hotplug_add_remove(client: IntegrationInstance):
assert len(ips_after_add) == len(ips_before) + 1
assert added_ip not in [ip.ip4 for ip in ips_before]
assert added_ip in [ip.ip4 for ip in ips_after_add]
- assert new_addition.state == 'UP'
+ assert new_addition.state == "UP"
- netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
config = yaml.safe_load(netplan_cfg)
- assert new_addition.interface in config['network']['ethernets']
+ assert new_addition.interface in config["network"]["ethernets"]
# Remove new NIC
client.instance.remove_network_interface(added_ip)
@@ -75,37 +76,37 @@ def test_hotplug_add_remove(client: IntegrationInstance):
assert len(ips_after_remove) == len(ips_before)
assert added_ip not in [ip.ip4 for ip in ips_after_remove]
- netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
config = yaml.safe_load(netplan_cfg)
- assert new_addition.interface not in config['network']['ethernets']
+ assert new_addition.interface not in config["network"]["ethernets"]
- assert 'enabled' == client.execute(
- 'cloud-init devel hotplug-hook -s net query'
+ assert "enabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
)
@pytest.mark.openstack
def test_no_hotplug_in_userdata(client: IntegrationInstance):
ips_before = _get_ip_addr(client)
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Exiting hotplug handler' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
assert client.execute(
- 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules'
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
).failed
# Add new NIC
client.instance.add_network_interface()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'hotplug-hook' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "hotplug-hook" not in log
ips_after_add = _get_ip_addr(client)
if len(ips_after_add) == len(ips_before) + 1:
# We can see the device, but it should not have been brought up
new_ip = [ip for ip in ips_after_add if ip not in ips_before][0]
- assert new_ip.state == 'DOWN'
+ assert new_ip.state == "DOWN"
else:
assert len(ips_after_add) == len(ips_before)
- assert 'disabled' == client.execute(
- 'cloud-init devel hotplug-hook -s net query'
+ assert "disabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
)
diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py
index fe8eff1a..7788c6f0 100644
--- a/tests/integration_tests/modules/test_jinja_templating.py
+++ b/tests/integration_tests/modules/test_jinja_templating.py
@@ -4,7 +4,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_ordered_items_in_text
-
USER_DATA = """\
## template: jinja
#cloud-config
@@ -23,12 +22,12 @@ def test_runcmd_with_variable_substitution(client: IntegrationInstance):
we can also substitute variables from instance-data-sensitive
LP: #1931392.
"""
- hostname = client.execute('hostname').stdout.strip()
+ hostname = client.execute("hostname").stdout.strip()
expected = [
hostname,
- ('Merged cloud-init system config from /etc/cloud/cloud.cfg and '
- '/etc/cloud/cloud.cfg.d/'),
- hostname
+ "Merged cloud-init system config from /etc/cloud/cloud.cfg and "
+ "/etc/cloud/cloud.cfg.d/",
+ hostname,
]
- output = client.read_from_file('/var/tmp/runcmd_output')
+ output = client.read_from_file("/var/tmp/runcmd_output")
verify_ordered_items_in_text(expected, output)
diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py
index e79db3c7..50899982 100644
--- a/tests/integration_tests/modules/test_keys_to_console.py
+++ b/tests/integration_tests/modules/test_keys_to_console.py
@@ -36,6 +36,7 @@ users:
@pytest.mark.user_data(BLACKLIST_USER_DATA)
class TestKeysToConsoleBlacklist:
"""Test that the blacklist options work as expected."""
+
@pytest.mark.parametrize("key_type", ["DSA", "ECDSA"])
def test_excluded_keys(self, class_client, key_type):
syslog = class_client.read_from_file("/var/log/syslog")
@@ -55,6 +56,7 @@ class TestAllKeysToConsoleBlacklist:
"""Test that when key blacklist contains all key types that
no header/footer are output.
"""
+
def test_header_excluded(self, class_client):
syslog = class_client.read_from_file("/var/log/syslog")
assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog
@@ -67,6 +69,7 @@ class TestAllKeysToConsoleBlacklist:
@pytest.mark.user_data(DISABLED_USER_DATA)
class TestKeysToConsoleDisabled:
"""Test that output can be fully disabled."""
+
@pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"])
def test_keys_excluded(self, class_client, key_type):
syslog = class_client.read_from_file("/var/log/syslog")
@@ -90,7 +93,7 @@ class TestKeysToConsoleEnabled:
"""Test that output can be enabled disabled."""
def test_duplicate_messaging_console_log(self, class_client):
- class_client.execute('cloud-init status --wait --long').ok
+ class_client.execute("cloud-init status --wait --long").ok
try:
console_log = class_client.instance.console_log()
except NotImplementedError:
@@ -98,13 +101,13 @@ class TestKeysToConsoleEnabled:
# log
pytest.skip("NotImplementedError when requesting console log")
return
- if console_log.lower() == 'no console output':
+ if console_log.lower() == "no console output":
# This test retries because we might not have the full console log
# on the first fetch. However, if we have no console output
# at all, we don't want to keep retrying as that would trigger
# another 5 minute wait on the pycloudlib side, which could
# leave us waiting for a couple hours
- pytest.fail('no console output')
+ pytest.fail("no console output")
return
msg = "no authorized SSH keys fingerprints found for user barfoo."
assert 1 == console_log.count(msg)
diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py
index 65dce3c7..2cb3f4f3 100644
--- a/tests/integration_tests/modules/test_lxd_bridge.py
+++ b/tests/integration_tests/modules/test_lxd_bridge.py
@@ -8,7 +8,6 @@ import yaml
from tests.integration_tests.util import verify_clean_log
-
USER_DATA = """\
#cloud-config
lxd:
@@ -29,7 +28,6 @@ lxd:
@pytest.mark.no_container
@pytest.mark.user_data(USER_DATA)
class TestLxdBridge:
-
@pytest.mark.parametrize("binary_name", ["lxc", "lxd"])
def test_binaries_installed(self, class_client, binary_name):
"""Check that the expected LXD binaries are installed"""
diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py
index c777a641..fc62e63b 100644
--- a/tests/integration_tests/modules/test_ntp_servers.py
+++ b/tests/integration_tests/modules/test_ntp_servers.py
@@ -9,8 +9,8 @@ and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``)
"""
import re
-import yaml
import pytest
+import yaml
from tests.integration_tests.instances import IntegrationInstance
@@ -33,13 +33,13 @@ EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"]
@pytest.mark.user_data(USER_DATA)
class TestNtpServers:
-
def test_ntp_installed(self, class_client: IntegrationInstance):
"""Test that `ntpd --version` succeeds, indicating installation."""
assert class_client.execute("ntpd --version").ok
- def test_dist_config_file_is_empty(self,
- class_client: IntegrationInstance):
+ def test_dist_config_file_is_empty(
+ self, class_client: IntegrationInstance
+ ):
"""Test that the distributed config file is empty.
(This test is skipped on all currently supported Ubuntu releases, so
@@ -56,13 +56,13 @@ class TestNtpServers:
assert re.search(
r"^server {} iburst".format(expected_server),
ntp_conf,
- re.MULTILINE
+ re.MULTILINE,
)
for expected_pool in EXPECTED_POOLS:
assert re.search(
r"^pool {} iburst".format(expected_pool),
ntp_conf,
- re.MULTILINE
+ re.MULTILINE,
)
def test_ntpq_servers(self, class_client: IntegrationInstance):
@@ -84,12 +84,12 @@ ntp:
@pytest.mark.user_data(CHRONY_DATA)
def test_chrony(client: IntegrationInstance):
- if client.execute('test -f /etc/chrony.conf').ok:
- chrony_conf = '/etc/chrony.conf'
+ if client.execute("test -f /etc/chrony.conf").ok:
+ chrony_conf = "/etc/chrony.conf"
else:
- chrony_conf = '/etc/chrony/chrony.conf'
+ chrony_conf = "/etc/chrony/chrony.conf"
contents = client.read_from_file(chrony_conf)
- assert 'server 172.16.15.14' in contents
+ assert "server 172.16.15.14" in contents
TIMESYNCD_DATA = """\
@@ -105,9 +105,9 @@ ntp:
@pytest.mark.user_data(TIMESYNCD_DATA)
def test_timesyncd(client: IntegrationInstance):
contents = client.read_from_file(
- '/etc/systemd/timesyncd.conf.d/cloud-init.conf'
+ "/etc/systemd/timesyncd.conf.d/cloud-init.conf"
)
- assert 'NTP=172.16.15.14' in contents
+ assert "NTP=172.16.15.14" in contents
EMPTY_NTP = """\
@@ -121,8 +121,8 @@ ntp:
@pytest.mark.user_data(EMPTY_NTP)
def test_empty_ntp(client: IntegrationInstance):
- assert client.execute('ntpd --version').ok
- assert client.execute('test -f /etc/ntp.conf.dist').failed
- assert 'pool.ntp.org iburst' in client.execute(
+ assert client.execute("ntpd --version").ok
+ assert client.execute("test -f /etc/ntp.conf.dist").failed
+ assert "pool.ntp.org iburst" in client.execute(
'grep -v "^#" /etc/ntp.conf'
)
diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py
index 28d741bc..d668d81c 100644
--- a/tests/integration_tests/modules/test_package_update_upgrade_install.py
+++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py
@@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as
"""
import re
-import pytest
+import pytest
USER_DATA = """\
#cloud-config
@@ -29,7 +29,6 @@ package_upgrade: true
@pytest.mark.ubuntu
@pytest.mark.user_data(USER_DATA)
class TestPackageUpdateUpgradeInstall:
-
def assert_package_installed(self, pkg_out, name, version=None):
"""Check dpkg-query --show output for matching package name.
@@ -38,7 +37,8 @@ class TestPackageUpdateUpgradeInstall:
version.
"""
pkg_match = re.search(
- "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE)
+ "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE
+ )
if pkg_match:
installed_version = pkg_match.group("version")
if not version:
@@ -46,8 +46,10 @@ class TestPackageUpdateUpgradeInstall:
if installed_version.startswith(version):
return # Success
raise AssertionError(
- "Expected package version %s-%s not found. Found %s" %
- name, version, installed_version)
+ "Expected package version %s-%s not found. Found %s" % name,
+ version,
+ installed_version,
+ )
raise AssertionError("Package not installed: %s" % name)
def test_new_packages_are_installed(self, class_client):
@@ -58,11 +60,13 @@ class TestPackageUpdateUpgradeInstall:
def test_packages_were_updated(self, class_client):
out = class_client.execute(
- "grep ^Commandline: /var/log/apt/history.log")
+ "grep ^Commandline: /var/log/apt/history.log"
+ )
assert (
"Commandline: /usr/bin/apt-get --option=Dpkg::Options"
"::=--force-confold --option=Dpkg::options::=--force-unsafe-io "
- "--assume-yes --quiet install sl tree") in out
+ "--assume-yes --quiet install sl tree" in out
+ )
def test_packages_were_upgraded(self, class_client):
"""Test cloud-init-output for install & upgrade stuff."""
diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py
index 00fdeaea..33527e1e 100644
--- a/tests/integration_tests/modules/test_persistence.py
+++ b/tests/integration_tests/modules/test_persistence.py
@@ -10,21 +10,23 @@ from tests.integration_tests.util import (
verify_ordered_items_in_text,
)
-
-PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl')
-TEST_PICKLE = ASSETS_DIR / 'trusty_with_mime.pkl'
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl"
@pytest.mark.lxd_container
def test_log_message_on_missing_version_file(client: IntegrationInstance):
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.restart()
- assert client.execute('cloud-init status --wait').ok
- log = client.read_from_file('/var/log/cloud-init.log')
- verify_ordered_items_in_text([
- "Unable to unpickle datasource: 'MIMEMultipart' object has no "
- "attribute 'policy'. Ignoring current cache.",
- 'no cache found',
- 'Searching for local data source',
- 'SUCCESS: found local data from DataSourceNoCloud'
- ], log)
+ assert client.execute("cloud-init status --wait").ok
+ log = client.read_from_file("/var/log/cloud-init.log")
+ verify_ordered_items_in_text(
+ [
+ "Unable to unpickle datasource: 'MIMEMultipart' object has no "
+ "attribute 'policy'. Ignoring current cache.",
+ "no cache found",
+ "Searching for local data source",
+ "SUCCESS: found local data from DataSourceNoCloud",
+ ],
+ log,
+ )
diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py
index 5f3a32ac..a629029d 100644
--- a/tests/integration_tests/modules/test_power_state_change.py
+++ b/tests/integration_tests/modules/test_power_state_change.py
@@ -30,7 +30,7 @@ def _detect_reboot(instance: IntegrationInstance):
instance.instance.wait()
for _ in range(600):
try:
- log = instance.read_from_file('/var/log/cloud-init.log')
+ log = instance.read_from_file("/var/log/cloud-init.log")
boot_count = log.count("running 'init-local'")
if boot_count == 1:
instance.instance.wait()
@@ -40,11 +40,11 @@ def _detect_reboot(instance: IntegrationInstance):
pass
time.sleep(1)
else:
- raise Exception('Could not detect reboot')
+ raise Exception("Could not detect reboot")
def _can_connect(instance):
- return instance.execute('true').ok
+ return instance.execute("true").ok
# This test is marked unstable because even though it should be able to
@@ -55,36 +55,44 @@ def _can_connect(instance):
@pytest.mark.ubuntu
@pytest.mark.lxd_container
class TestPowerChange:
- @pytest.mark.parametrize('mode,delay,timeout,expected', [
- ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'),
- ('reboot', 'now', '0', 'will execute: shutdown -r now msg'),
- ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'),
- ])
- def test_poweroff(self, session_cloud: IntegrationCloud,
- mode, delay, timeout, expected):
+ @pytest.mark.parametrize(
+ "mode,delay,timeout,expected",
+ [
+ ("poweroff", "now", "10", "will execute: shutdown -P now msg"),
+ ("reboot", "now", "0", "will execute: shutdown -r now msg"),
+ ("halt", "+1", "0", "will execute: shutdown -H +1 msg"),
+ ],
+ )
+ def test_poweroff(
+ self, session_cloud: IntegrationCloud, mode, delay, timeout, expected
+ ):
with session_cloud.launch(
user_data=USER_DATA.format(
- delay=delay, mode=mode, timeout=timeout, condition='true'),
- launch_kwargs={'wait': False},
+ delay=delay, mode=mode, timeout=timeout, condition="true"
+ ),
+ launch_kwargs={"wait": False},
) as instance:
- if mode == 'reboot':
+ if mode == "reboot":
_detect_reboot(instance)
else:
instance.instance.wait_for_stop()
instance.instance.start(wait=True)
- log = instance.read_from_file('/var/log/cloud-init.log')
+ log = instance.read_from_file("/var/log/cloud-init.log")
assert _can_connect(instance)
lines_to_check = [
- 'Running module power-state-change',
+ "Running module power-state-change",
expected,
"running 'init-local'",
- 'config-power-state-change already ran',
+ "config-power-state-change already ran",
]
verify_ordered_items_in_text(lines_to_check, log)
- @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff',
- timeout='0', condition='false'))
+ @pytest.mark.user_data(
+ USER_DATA.format(
+ delay="0", mode="poweroff", timeout="0", condition="false"
+ )
+ )
def test_poweroff_false_condition(self, client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert _can_connect(client)
- assert 'Condition was false. Will not perform state change' in log
+ assert "Condition was false. Will not perform state change" in log
diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py
index f40a6ca3..1bd9cee4 100644
--- a/tests/integration_tests/modules/test_puppet.py
+++ b/tests/integration_tests/modules/test_puppet.py
@@ -15,9 +15,9 @@ puppet:
@pytest.mark.user_data(SERVICE_DATA)
def test_puppet_service(client: IntegrationInstance):
"""Basic test that puppet gets installed and runs."""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
- assert client.execute('systemctl is-active puppet').ok
+ assert client.execute("systemctl is-active puppet").ok
assert "Running command ['puppet', 'agent'" not in log
@@ -35,5 +35,5 @@ puppet:
@pytest.mark.user_data(EXEC_DATA)
def test_pupet_exec(client: IntegrationInstance):
"""Basic test that puppet gets installed and runs."""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert "Running command ['puppet', 'agent', '--noop']" in log
diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py
index e7f7f6b6..ae0aeae9 100644
--- a/tests/integration_tests/modules/test_set_hostname.py
+++ b/tests/integration_tests/modules/test_set_hostname.py
@@ -11,7 +11,6 @@ after the system is boot.
import pytest
-
USER_DATA_HOSTNAME = """\
#cloud-config
hostname: cloudinit2
@@ -34,7 +33,6 @@ fqdn: cloudinit2.test.io
@pytest.mark.ci
class TestHostname:
-
@pytest.mark.user_data(USER_DATA_HOSTNAME)
def test_hostname(self, client):
hostname_output = client.execute("hostname")
@@ -59,6 +57,8 @@ class TestHostname:
assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip()
host_output = client.execute("grep ^127 /etc/hosts")
- assert '127.0.1.1 {} {}'.format(
- fqdn_output, hostname_output) in host_output
- assert '127.0.0.1 localhost' in host_output
+ assert (
+ "127.0.1.1 {} {}".format(fqdn_output, hostname_output)
+ in host_output
+ )
+ assert "127.0.0.1 localhost" in host_output
diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py
index ac9db19d..e0f8b692 100644
--- a/tests/integration_tests/modules/test_set_password.py
+++ b/tests/integration_tests/modules/test_set_password.py
@@ -15,7 +15,6 @@ import yaml
from tests.integration_tests.util import retry
-
COMMON_USER_DATA = """\
#cloud-config
ssh_pwauth: yes
@@ -42,7 +41,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/"
lock_passwd: false
"""
-LIST_USER_DATA = COMMON_USER_DATA + """
+LIST_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list:
- tom:mypassword123!
@@ -50,8 +51,11 @@ chpasswd:
- harry:RANDOM
- mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
-STRING_USER_DATA = COMMON_USER_DATA + """
+STRING_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list: |
tom:mypassword123!
@@ -59,6 +63,7 @@ chpasswd:
harry:RANDOM
mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"]
USERS_PASSWD_VALUES = {
@@ -141,13 +146,13 @@ class Mixin:
# log
pytest.skip("NotImplementedError when requesting console log")
return
- if console_log.lower() == 'no console output':
+ if console_log.lower() == "no console output":
# This test retries because we might not have the full console log
# on the first fetch. However, if we have no console output
# at all, we don't want to keep retrying as that would trigger
# another 5 minute wait on the pycloudlib side, which could
# leave us waiting for a couple hours
- pytest.fail('no console output')
+ pytest.fail("no console output")
return
assert "dick:" in console_log
assert "harry:" in console_log
diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
index cf14d0b0..89b49576 100644
--- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
+++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
@@ -14,7 +14,6 @@ import pytest
from tests.integration_tests.util import retry
-
USER_DATA_SSH_AUTHKEY_DISABLE = """\
#cloud-config
no_ssh_fingerprints: true
@@ -32,13 +31,13 @@ ssh_authorized_keys:
@pytest.mark.ci
class TestSshAuthkeyFingerprints:
-
@pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE)
def test_ssh_authkey_fingerprints_disable(self, client):
cloudinit_output = client.read_from_file("/var/log/cloud-init.log")
assert (
"Skipping module named ssh-authkey-fingerprints, "
- "logging of SSH fingerprints disabled") in cloudinit_output
+ "logging of SSH fingerprints disabled" in cloudinit_output
+ )
# retry decorator here because it can take some time to be reflected
# in syslog
@@ -47,7 +46,7 @@ class TestSshAuthkeyFingerprints:
def test_ssh_authkey_fingerprints_enable(self, client):
syslog_output = client.read_from_file("/var/log/syslog")
- assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None
- assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None
- assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None
- assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None
+ assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None
+ assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None
+ assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None
+ assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None
diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py
index 60c36982..1dd0adf1 100644
--- a/tests/integration_tests/modules/test_ssh_generate.py
+++ b/tests/integration_tests/modules/test_ssh_generate.py
@@ -10,7 +10,6 @@ keys were created.
import pytest
-
USER_DATA = """\
#cloud-config
ssh_genkeytypes:
@@ -23,28 +22,27 @@ authkey_hash: sha512
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysGenerate:
-
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_dsa_key.pub",
"/etc/ssh/ssh_host_dsa_key",
"/etc/ssh/ssh_host_rsa_key.pub",
"/etc/ssh/ssh_host_rsa_key",
- )
+ ),
)
def test_ssh_keys_not_generated(self, ssh_key_path, class_client):
- out = class_client.execute(
- "test -e {}".format(ssh_key_path)
- )
+ out = class_client.execute("test -e {}".format(ssh_key_path))
assert out.failed
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_ecdsa_key.pub",
"/etc/ssh/ssh_host_ecdsa_key",
"/etc/ssh/ssh_host_ed25519_key.pub",
"/etc/ssh/ssh_host_ed25519_key",
- )
+ ),
)
def test_ssh_keys_generated(self, ssh_key_path, class_client):
out = class_client.read_from_file(ssh_key_path)
diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py
index 6aae96ae..b79f18eb 100644
--- a/tests/integration_tests/modules/test_ssh_keys_provided.py
+++ b/tests/integration_tests/modules/test_ssh_keys_provided.py
@@ -9,7 +9,6 @@ system.
import pytest
-
USER_DATA = """\
#cloud-config
disable_root: false
@@ -82,44 +81,33 @@ ssh_keys:
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysProvided:
-
@pytest.mark.parametrize(
"config_path,expected_out",
(
(
"/etc/ssh/ssh_host_dsa_key.pub",
- (
- "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
- "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM"
- ),
+ "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
+ "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM",
),
(
"/etc/ssh/ssh_host_dsa_key",
- (
- "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
- "hOVAfzZ6+jklP"
- ),
+ "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
+ "hOVAfzZ6+jklP",
),
(
"/etc/ssh/ssh_host_rsa_key.pub",
- (
- "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
- "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4"
- ),
+ "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
+ "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4",
),
(
"/etc/ssh/ssh_host_rsa_key",
- (
- "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
- "RQvLZpMRdywBm"
- ),
+ "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
+ "RQvLZpMRdywBm",
),
(
"/etc/ssh/ssh_host_rsa_key-cert.pub",
- (
- "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
- "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD"
- ),
+ "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
+ "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD",
),
(
"/etc/ssh/sshd_config",
@@ -127,33 +115,25 @@ class TestSshKeysProvided:
),
(
"/etc/ssh/ssh_host_ecdsa_key.pub",
- (
- "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
- "BBFsS5Tvky/IC/dXhE/afxxU"
- ),
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
+ "BBFsS5Tvky/IC/dXhE/afxxU",
),
(
"/etc/ssh/ssh_host_ecdsa_key",
- (
- "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
- "5mpZqxgX4vcgb"
- ),
+ "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
+ "5mpZqxgX4vcgb",
),
(
"/etc/ssh/ssh_host_ed25519_key.pub",
- (
- "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
- "G15dqjQ2XkNVOEnb5"
- ),
+ "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
+ "G15dqjQ2XkNVOEnb5",
),
(
"/etc/ssh/ssh_host_ed25519_key",
- (
- "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
- "OhteXao0Nl5DVThJ2+Q"
- ),
+ "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
+ "OhteXao0Nl5DVThJ2+Q",
),
- )
+ ),
)
def test_ssh_provided_keys(self, config_path, expected_out, class_client):
out = class_client.read_from_file(config_path).strip()
diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py
index b39454e6..8330a1ce 100644
--- a/tests/integration_tests/modules/test_ssh_keysfile.py
+++ b/tests/integration_tests/modules/test_ssh_keysfile.py
@@ -1,15 +1,16 @@
+from io import StringIO
+
import paramiko
import pytest
-from io import StringIO
from paramiko.ssh_exception import SSHException
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import get_test_rsa_keypair
-TEST_USER1_KEYS = get_test_rsa_keypair('test1')
-TEST_USER2_KEYS = get_test_rsa_keypair('test2')
-TEST_DEFAULT_KEYS = get_test_rsa_keypair('test3')
+TEST_USER1_KEYS = get_test_rsa_keypair("test1")
+TEST_USER2_KEYS = get_test_rsa_keypair("test2")
+TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3")
_USERDATA = """\
#cloud-config
@@ -26,7 +27,7 @@ users:
ssh_authorized_keys:
- {user2}
""".format(
- bootcmd='{bootcmd}',
+ bootcmd="{bootcmd}",
default=TEST_DEFAULT_KEYS.public_key,
user1=TEST_USER1_KEYS.public_key,
user2=TEST_USER2_KEYS.public_key,
@@ -37,9 +38,9 @@ def common_verify(client, expected_keys):
for user, filename, keys in expected_keys:
# Ensure key is in the key file
contents = client.read_from_file(filename)
- if user in ['ubuntu', 'root']:
- lines = contents.split('\n')
- if user == 'root':
+ if user in ["ubuntu", "root"]:
+ lines = contents.split("\n")
+ if user == "root":
# Our personal public key gets added by pycloudlib in
# addition to the default `ssh_authorized_keys`
assert len(lines) == 2
@@ -54,8 +55,9 @@ def common_verify(client, expected_keys):
# Ensure we can actually connect
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- paramiko_key = paramiko.RSAKey.from_private_key(StringIO(
- keys.private_key))
+ paramiko_key = paramiko.RSAKey.from_private_key(
+ StringIO(keys.private_key)
+ )
# Will fail with AuthenticationException if
# we cannot connect
@@ -71,8 +73,11 @@ def common_verify(client, expected_keys):
other_users = [u[0] for u in expected_keys if u[2] != keys]
for other_user in other_users:
with pytest.raises(SSHException):
- print('trying to connect as {} with key from {}'.format(
- other_user, user))
+ print(
+ "trying to connect as {} with key from {}".format(
+ other_user, user
+ )
+ )
ssh.connect(
client.instance.ip,
username=other_user,
@@ -83,37 +88,38 @@ def common_verify(client, expected_keys):
# Ensure we haven't messed with any /home permissions
# See LP: #1940233
- home_dir = '/home/{}'.format(user)
+ home_dir = "/home/{}".format(user)
# Home permissions aren't consistent between releases. On ubuntu
# this can change to 750 once focal is unsupported.
if ImageSpecification.from_os_image().release in ("bionic", "focal"):
- home_perms = '755'
+ home_perms = "755"
else:
- home_perms = '750'
- if user == 'root':
- home_dir = '/root'
- home_perms = '700'
- assert '{} {}'.format(user, home_perms) == client.execute(
+ home_perms = "750"
+ if user == "root":
+ home_dir = "/root"
+ home_perms = "700"
+ assert "{} {}".format(user, home_perms) == client.execute(
'stat -c "%U %a" {}'.format(home_dir)
)
if client.execute("test -d {}/.ssh".format(home_dir)).ok:
- assert '{} 700'.format(user) == client.execute(
+ assert "{} 700".format(user) == client.execute(
'stat -c "%U %a" {}/.ssh'.format(home_dir)
)
- assert '{} 600'.format(user) == client.execute(
+ assert "{} 600".format(user) == client.execute(
'stat -c "%U %a" {}'.format(filename)
)
# Also ensure ssh-keygen works as expected
- client.execute('mkdir {}/.ssh'.format(home_dir))
+ client.execute("mkdir {}/.ssh".format(home_dir))
assert client.execute(
"ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format(
- home_dir)
+ home_dir
+ )
).ok
- assert client.execute('test -f {}/.ssh/id_rsa'.format(home_dir))
- assert client.execute('test -f {}/.ssh/id_rsa.pub'.format(home_dir))
+ assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir))
+ assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir))
- assert 'root 755' == client.execute('stat -c "%U %a" /home')
+ assert "root 755" == client.execute('stat -c "%U %a" /home')
DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""')
@@ -123,75 +129,96 @@ DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""')
@pytest.mark.user_data(DEFAULT_KEYS_USERDATA)
def test_authorized_keys_default(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/.ssh/authorized_keys',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/.ssh/authorized_keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/.ssh/authorized_keys',
- TEST_DEFAULT_KEYS),
- ('root', '/root/.ssh/authorized_keys', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' "
- "/etc/ssh/sshd_config"))
+AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA)
def test_authorized_keys2(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/.ssh/authorized_keys2',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/.ssh/authorized_keys2',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/.ssh/authorized_keys2',
- TEST_DEFAULT_KEYS),
- ('root', '/root/.ssh/authorized_keys2', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys2",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys2",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-NESTED_KEYS_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' "
- "/etc/ssh/sshd_config"))
+NESTED_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(NESTED_KEYS_USERDATA)
def test_nested_keys(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/foo/bar/ssh/keys',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/foo/bar/ssh/keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/foo/bar/ssh/keys',
- TEST_DEFAULT_KEYS),
- ('root', '/root/foo/bar/ssh/keys', TEST_DEFAULT_KEYS),
+ ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS),
+ ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS),
+ ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-EXTERNAL_KEYS_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' "
- "/etc/ssh/sshd_config"))
+EXTERNAL_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(EXTERNAL_KEYS_USERDATA)
def test_external_keys(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/etc/ssh/authorized_keys/test_user1/keys',
- TEST_USER1_KEYS),
- ('test_user2', '/etc/ssh/authorized_keys/test_user2/keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/etc/ssh/authorized_keys/ubuntu/keys',
- TEST_DEFAULT_KEYS),
- ('root', '/etc/ssh/authorized_keys/root/keys', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/etc/ssh/authorized_keys/test_user1/keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/etc/ssh/authorized_keys/test_user2/keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS),
+ ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py
index fffa0746..e4a4241f 100644
--- a/tests/integration_tests/modules/test_user_events.py
+++ b/tests/integration_tests/modules/test_user_events.py
@@ -3,8 +3,9 @@
This is currently limited to applying network config on BOOT events.
"""
-import pytest
import re
+
+import pytest
import yaml
from tests.integration_tests.instances import IntegrationInstance
@@ -13,16 +14,16 @@ from tests.integration_tests.instances import IntegrationInstance
def _add_dummy_bridge_to_netplan(client: IntegrationInstance):
# Update netplan configuration to ensure it doesn't change on reboot
netplan = yaml.safe_load(
- client.execute('cat /etc/netplan/50-cloud-init.yaml')
+ client.execute("cat /etc/netplan/50-cloud-init.yaml")
)
# Just a dummy bridge to do nothing
try:
- netplan['network']['bridges']['dummy0'] = {'dhcp4': False}
+ netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False}
except KeyError:
- netplan['network']['bridges'] = {'dummy0': {'dhcp4': False}}
+ netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}}
dumped_netplan = yaml.dump(netplan)
- client.write_to_file('/etc/netplan/50-cloud-init.yaml', dumped_netplan)
+ client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan)
@pytest.mark.lxd_container
@@ -32,19 +33,19 @@ def _add_dummy_bridge_to_netplan(client: IntegrationInstance):
@pytest.mark.oci
@pytest.mark.openstack
def test_boot_event_disabled_by_default(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'network config is disabled' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
pytest.skip("network config disabled. Test doesn't apply")
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
_add_dummy_bridge_to_netplan(client)
- client.execute('rm /var/log/cloud-init.log')
+ client.execute("rm /var/log/cloud-init.log")
client.restart()
- log2 = client.read_from_file('/var/log/cloud-init.log')
+ log2 = client.read_from_file("/var/log/cloud-init.log")
- if 'cache invalid in datasource' in log2:
+ if "cache invalid in datasource" in log2:
# Invalid cache will get cleared, meaning we'll create a new
# "instance" and apply networking config, so events aren't
# really relevant here
@@ -53,8 +54,9 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance):
# We attempt to apply network config twice on every boot.
# Ensure neither time works.
assert 2 == len(
- re.findall(r"Event Denied: scopes=\['network'\] EventType=boot[^-]",
- log2)
+ re.findall(
+ r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2
+ )
)
assert 2 == log2.count(
"Event Denied: scopes=['network'] EventType=boot-legacy"
@@ -64,30 +66,30 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance):
" nor datasource network update allowed"
)
- assert 'dummy0' in client.execute('ls /sys/class/net')
+ assert "dummy0" in client.execute("ls /sys/class/net")
def _test_network_config_applied_on_reboot(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'network config is disabled' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
pytest.skip("network config disabled. Test doesn't apply")
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
_add_dummy_bridge_to_netplan(client)
client.execute('echo "" > /var/log/cloud-init.log')
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'cache invalid in datasource' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "cache invalid in datasource" in log:
# Invalid cache will get cleared, meaning we'll create a new
# "instance" and apply networking config, so events aren't
# really relevant here
pytest.skip("Test only valid for existing instances")
- assert 'Event Allowed: scope=network EventType=boot' in log
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Event Allowed: scope=network EventType=boot" in log
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
@pytest.mark.azure
diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py
index bcb17b7f..3d1358ce 100644
--- a/tests/integration_tests/modules/test_users_groups.py
+++ b/tests/integration_tests/modules/test_users_groups.py
@@ -11,7 +11,6 @@ import pytest
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
# Add groups to the system
@@ -84,7 +83,9 @@ class TestUsersGroups:
assert re.search(regex, result.stdout) is not None, (
"'getent {}' resulted in '{}', "
"but expected to match regex {}".format(
- ' '.join(getent_args), result.stdout, regex))
+ " ".join(getent_args), result.stdout, regex
+ )
+ )
def test_user_root_in_secret(self, class_client):
"""Test root user is in 'secret' group."""
@@ -105,19 +106,21 @@ def test_sudoers_includedir(client: IntegrationInstance):
https://github.com/canonical/cloud-init/pull/783
"""
if ImageSpecification.from_os_image().release in [
- 'xenial', 'bionic', 'focal'
+ "xenial",
+ "bionic",
+ "focal",
]:
raise pytest.skip(
- 'Test requires version of sudo installed on groovy and later'
+ "Test requires version of sudo installed on groovy and later"
)
client.execute("sed -i 's/#include/@include/g' /etc/sudoers")
- sudoers = client.read_from_file('/etc/sudoers')
- if '@includedir /etc/sudoers.d' not in sudoers:
+ sudoers = client.read_from_file("/etc/sudoers")
+ if "@includedir /etc/sudoers.d" not in sudoers:
client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers")
client.instance.clean()
client.restart()
- sudoers = client.read_from_file('/etc/sudoers')
+ sudoers = client.read_from_file("/etc/sudoers")
- assert '#includedir' not in sudoers
- assert sudoers.count('includedir /etc/sudoers.d') == 1
+ assert "#includedir" not in sudoers
+ assert sudoers.count("includedir /etc/sudoers.d") == 1
diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py
index f28079d4..3168cd60 100644
--- a/tests/integration_tests/modules/test_version_change.py
+++ b/tests/integration_tests/modules/test_version_change.py
@@ -5,39 +5,40 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import ASSETS_DIR, verify_clean_log
-
-PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl')
-TEST_PICKLE = ASSETS_DIR / 'test_version_change.pkl'
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl"
def _assert_no_pickle_problems(log):
- assert 'Failed loading pickled blob' not in log
+ assert "Failed loading pickled blob" not in log
verify_clean_log(log)
def test_reboot_without_version_change(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected' not in log
- assert 'Cache compatibility status is currently unknown.' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Cache compatibility status is currently unknown." not in log
_assert_no_pickle_problems(log)
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected' not in log
- assert 'Could not determine Python version used to write cache' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Could not determine Python version used to write cache" not in log
_assert_no_pickle_problems(log)
# Now ensure that loading a bad pickle gives us problems
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
# no cache found is an "expected" upgrade error, and
# "Failed" means we're unable to load the pickle
- assert any([
- 'Failed loading pickled blob from {}'.format(PICKLE_PATH) in log,
- 'no cache found' in log
- ])
+ assert any(
+ [
+ "Failed loading pickled blob from {}".format(PICKLE_PATH) in log,
+ "no cache found" in log,
+ ]
+ )
@pytest.mark.ec2
@@ -54,8 +55,8 @@ def test_cache_purged_on_version_change(client: IntegrationInstance):
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.execute("echo '1.0' > /var/lib/cloud/data/python-version")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected. Purging cache' in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected. Purging cache" in log
_assert_no_pickle_problems(log)
@@ -65,11 +66,11 @@ def test_log_message_on_missing_version_file(client: IntegrationInstance):
client.execute("rm /var/lib/cloud/data/python-version")
client.execute("rm /var/log/cloud-init.log")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'no cache found' not in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "no cache found" not in log:
# We don't expect the python version file to exist if we have no
# pre-existing cache
assert (
- 'Writing python-version file. '
- 'Cache compatibility status is currently unknown.'
- ) in log
+ "Writing python-version file. "
+ "Cache compatibility status is currently unknown." in log
+ )
diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py
index 1d532fac..1eb7e945 100644
--- a/tests/integration_tests/modules/test_write_files.py
+++ b/tests/integration_tests/modules/test_write_files.py
@@ -7,8 +7,8 @@ and then checks if those files were created during boot.
``tests/cloud_tests/testcases/modules/write_files.yaml``.)"""
import base64
-import pytest
+import pytest
ASCII_TEXT = "ASCII text"
B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8"))
@@ -50,25 +50,30 @@ write_files:
defer: true
owner: 'myuser'
permissions: '0644'
-""".format(B64_CONTENT.decode("ascii"))
+""".format(
+ B64_CONTENT.decode("ascii")
+)
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestWriteFiles:
-
@pytest.mark.parametrize(
- "cmd,expected_out", (
+ "cmd,expected_out",
+ (
("file /root/file_b64", ASCII_TEXT),
("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"),
- ("sha256sum </root/file_binary", (
+ (
+ "sha256sum </root/file_binary",
"2c791c4037ea5bd7e928d6a87380f8ba"
- "7a803cd83d5e4f269e28f5090f0f2c9a"
- )),
- ("file /root/file_gzip",
- "POSIX shell script, ASCII text executable"),
+ "7a803cd83d5e4f269e28f5090f0f2c9a",
+ ),
+ (
+ "file /root/file_gzip",
+ "POSIX shell script, ASCII text executable",
+ ),
("file /root/file_text", ASCII_TEXT),
- )
+ ),
)
def test_write_files(self, cmd, expected_out, class_client):
out = class_client.execute(cmd)
@@ -82,6 +87,7 @@ class TestWriteFiles:
"""
out = class_client.read_from_file("/home/testuser/my-file")
assert "echo 'hello world!'" == out
- assert class_client.execute(
- 'stat -c "%U %a" /home/testuser/my-file'
- ) == 'myuser 644'
+ assert (
+ class_client.execute('stat -c "%U %a" /home/testuser/my-file')
+ == "myuser 644"
+ )
diff --git a/tests/integration_tests/test_upgrade.py b/tests/integration_tests/test_upgrade.py
index 0ba4754c..e53ea998 100644
--- a/tests/integration_tests/test_upgrade.py
+++ b/tests/integration_tests/test_upgrade.py
@@ -1,14 +1,14 @@
import json
import logging
import os
+
import pytest
from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud
from tests.integration_tests.conftest import get_validated_source
from tests.integration_tests.util import verify_clean_log
-
-LOG = logging.getLogger('integration_testing.test_upgrade')
+LOG = logging.getLogger("integration_testing.test_upgrade")
LOG_TEMPLATE = """\n\
=== `systemd-analyze` before:
@@ -46,8 +46,10 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud):
if not source.installs_new_version():
pytest.skip(UNSUPPORTED_INSTALL_METHOD_MSG.format(source))
return # type checking doesn't understand that skip raises
- if (ImageSpecification.from_os_image().release == 'bionic' and
- session_cloud.settings.PLATFORM == 'lxd_vm'):
+ if (
+ ImageSpecification.from_os_image().release == "bionic"
+ and session_cloud.settings.PLATFORM == "lxd_vm"
+ ):
# The issues that we see on Bionic VMs don't appear anywhere
# else, including when calling KVM directly. It likely has to
# do with the extra lxd-agent setup happening on bionic.
@@ -57,32 +59,34 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud):
return
launch_kwargs = {
- 'image_id': session_cloud.released_image_id,
+ "image_id": session_cloud.released_image_id,
}
with session_cloud.launch(
- launch_kwargs=launch_kwargs, user_data=USER_DATA,
+ launch_kwargs=launch_kwargs,
+ user_data=USER_DATA,
) as instance:
# get pre values
- pre_hostname = instance.execute('hostname')
- pre_cloud_id = instance.execute('cloud-id')
- pre_result = instance.execute('cat /run/cloud-init/result.json')
- pre_network = instance.execute('cat /etc/netplan/50-cloud-init.yaml')
- pre_systemd_analyze = instance.execute('systemd-analyze')
- pre_systemd_blame = instance.execute('systemd-analyze blame')
- pre_cloud_analyze = instance.execute('cloud-init analyze show')
- pre_cloud_blame = instance.execute('cloud-init analyze blame')
+ pre_hostname = instance.execute("hostname")
+ pre_cloud_id = instance.execute("cloud-id")
+ pre_result = instance.execute("cat /run/cloud-init/result.json")
+ pre_network = instance.execute("cat /etc/netplan/50-cloud-init.yaml")
+ pre_systemd_analyze = instance.execute("systemd-analyze")
+ pre_systemd_blame = instance.execute("systemd-analyze blame")
+ pre_cloud_analyze = instance.execute("cloud-init analyze show")
+ pre_cloud_blame = instance.execute("cloud-init analyze blame")
# Ensure no issues pre-upgrade
- log = instance.read_from_file('/var/log/cloud-init.log')
- assert not json.loads(pre_result)['v1']['errors']
+ log = instance.read_from_file("/var/log/cloud-init.log")
+ assert not json.loads(pre_result)["v1"]["errors"]
try:
verify_clean_log(log)
except AssertionError:
LOG.warning(
- 'There were errors/warnings/tracebacks pre-upgrade. '
- 'Any failures may be due to pre-upgrade problem')
+ "There were errors/warnings/tracebacks pre-upgrade. "
+ "Any failures may be due to pre-upgrade problem"
+ )
# Upgrade
instance.install_new_cloud_init(source, take_snapshot=False)
@@ -91,27 +95,27 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud):
# have broken across re-constitution of a cached datasource. Some
# platforms invalidate their datasource cache on reboot, so we run
# it here to ensure we get a dirty run.
- assert instance.execute('cloud-init init').ok
+ assert instance.execute("cloud-init init").ok
# Reboot
- instance.execute('hostname something-else')
+ instance.execute("hostname something-else")
instance.restart()
- assert instance.execute('cloud-init status --wait --long').ok
+ assert instance.execute("cloud-init status --wait --long").ok
# get post values
- post_hostname = instance.execute('hostname')
- post_cloud_id = instance.execute('cloud-id')
- post_result = instance.execute('cat /run/cloud-init/result.json')
- post_network = instance.execute('cat /etc/netplan/50-cloud-init.yaml')
- post_systemd_analyze = instance.execute('systemd-analyze')
- post_systemd_blame = instance.execute('systemd-analyze blame')
- post_cloud_analyze = instance.execute('cloud-init analyze show')
- post_cloud_blame = instance.execute('cloud-init analyze blame')
+ post_hostname = instance.execute("hostname")
+ post_cloud_id = instance.execute("cloud-id")
+ post_result = instance.execute("cat /run/cloud-init/result.json")
+ post_network = instance.execute("cat /etc/netplan/50-cloud-init.yaml")
+ post_systemd_analyze = instance.execute("systemd-analyze")
+ post_systemd_blame = instance.execute("systemd-analyze blame")
+ post_cloud_analyze = instance.execute("cloud-init analyze show")
+ post_cloud_blame = instance.execute("cloud-init analyze blame")
# Ensure no issues post-upgrade
- assert not json.loads(pre_result)['v1']['errors']
+ assert not json.loads(pre_result)["v1"]["errors"]
- log = instance.read_from_file('/var/log/cloud-init.log')
+ log = instance.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
# Ensure important things stayed the same
@@ -120,36 +124,46 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud):
try:
assert pre_result == post_result
except AssertionError:
- if instance.settings.PLATFORM == 'azure':
+ if instance.settings.PLATFORM == "azure":
pre_json = json.loads(pre_result)
post_json = json.loads(post_result)
- assert pre_json['v1']['datasource'].startswith(
- 'DataSourceAzure')
- assert post_json['v1']['datasource'].startswith(
- 'DataSourceAzure')
+ assert pre_json["v1"]["datasource"].startswith(
+ "DataSourceAzure"
+ )
+ assert post_json["v1"]["datasource"].startswith(
+ "DataSourceAzure"
+ )
assert pre_network == post_network
# Calculate and log all the boot numbers
pre_analyze_totals = [
- x for x in pre_cloud_analyze.splitlines()
- if x.startswith('Finished stage') or x.startswith('Total Time')
+ x
+ for x in pre_cloud_analyze.splitlines()
+ if x.startswith("Finished stage") or x.startswith("Total Time")
]
post_analyze_totals = [
- x for x in post_cloud_analyze.splitlines()
- if x.startswith('Finished stage') or x.startswith('Total Time')
+ x
+ for x in post_cloud_analyze.splitlines()
+ if x.startswith("Finished stage") or x.startswith("Total Time")
]
# pylint: disable=logging-format-interpolation
- LOG.info(LOG_TEMPLATE.format(
- pre_systemd_analyze=pre_systemd_analyze,
- post_systemd_analyze=post_systemd_analyze,
- pre_systemd_blame='\n'.join(pre_systemd_blame.splitlines()[:10]),
- post_systemd_blame='\n'.join(post_systemd_blame.splitlines()[:10]),
- pre_analyze_totals='\n'.join(pre_analyze_totals),
- post_analyze_totals='\n'.join(post_analyze_totals),
- pre_cloud_blame='\n'.join(pre_cloud_blame.splitlines()[:10]),
- post_cloud_blame='\n'.join(post_cloud_blame.splitlines()[:10]),
- ))
+ LOG.info(
+ LOG_TEMPLATE.format(
+ pre_systemd_analyze=pre_systemd_analyze,
+ post_systemd_analyze=post_systemd_analyze,
+ pre_systemd_blame="\n".join(
+ pre_systemd_blame.splitlines()[:10]
+ ),
+ post_systemd_blame="\n".join(
+ post_systemd_blame.splitlines()[:10]
+ ),
+ pre_analyze_totals="\n".join(pre_analyze_totals),
+ post_analyze_totals="\n".join(post_analyze_totals),
+ pre_cloud_blame="\n".join(pre_cloud_blame.splitlines()[:10]),
+ post_cloud_blame="\n".join(post_cloud_blame.splitlines()[:10]),
+ )
+ )
@pytest.mark.ci
@@ -157,18 +171,18 @@ def test_clean_boot_of_upgraded_package(session_cloud: IntegrationCloud):
def test_subsequent_boot_of_upgraded_package(session_cloud: IntegrationCloud):
source = get_validated_source(session_cloud)
if not source.installs_new_version():
- if os.environ.get('TRAVIS'):
+ if os.environ.get("TRAVIS"):
# If this isn't running on CI, we should know
pytest.fail(UNSUPPORTED_INSTALL_METHOD_MSG.format(source))
else:
pytest.skip(UNSUPPORTED_INSTALL_METHOD_MSG.format(source))
return # type checking doesn't understand that skip raises
- launch_kwargs = {'image_id': session_cloud.released_image_id}
+ launch_kwargs = {"image_id": session_cloud.released_image_id}
with session_cloud.launch(launch_kwargs=launch_kwargs) as instance:
instance.install_new_cloud_init(
source, take_snapshot=False, clean=False
)
instance.restart()
- assert instance.execute('cloud-init status --wait --long').ok
+ assert instance.execute("cloud-init status --wait --long").ok
diff --git a/tests/integration_tests/util.py b/tests/integration_tests/util.py
index e40d80fe..31fe69c0 100644
--- a/tests/integration_tests/util.py
+++ b/tests/integration_tests/util.py
@@ -3,16 +3,15 @@ import logging
import multiprocessing
import os
import time
-from contextlib import contextmanager
from collections import namedtuple
+from contextlib import contextmanager
from pathlib import Path
+log = logging.getLogger("integration_testing")
+key_pair = namedtuple("key_pair", "public_key private_key")
-log = logging.getLogger('integration_testing')
-key_pair = namedtuple('key_pair', 'public_key private_key')
-
-ASSETS_DIR = Path('tests/integration_tests/assets')
-KEY_PATH = ASSETS_DIR / 'keys'
+ASSETS_DIR = Path("tests/integration_tests/assets")
+KEY_PATH = ASSETS_DIR / "keys"
def verify_ordered_items_in_text(to_verify: list, text: str):
@@ -30,26 +29,27 @@ def verify_ordered_items_in_text(to_verify: list, text: str):
def verify_clean_log(log):
"""Assert no unexpected tracebacks or warnings in logs"""
- warning_count = log.count('WARN')
+ warning_count = log.count("WARN")
expected_warnings = 0
- traceback_count = log.count('Traceback')
+ traceback_count = log.count("Traceback")
expected_tracebacks = 0
warning_texts = [
# Consistently on all Azure launches:
# azure.py[WARNING]: No lease found; using default endpoint
- 'No lease found; using default endpoint'
+ "No lease found; using default endpoint"
]
traceback_texts = []
- if 'oracle' in log:
+ if "oracle" in log:
# LP: #1842752
- lease_exists_text = 'Stderr: RTNETLINK answers: File exists'
+ lease_exists_text = "Stderr: RTNETLINK answers: File exists"
warning_texts.append(lease_exists_text)
traceback_texts.append(lease_exists_text)
# LP: #1833446
fetch_error_text = (
- 'UrlError: 404 Client Error: Not Found for url: '
- 'http://169.254.169.254/latest/meta-data/')
+ "UrlError: 404 Client Error: Not Found for url: "
+ "http://169.254.169.254/latest/meta-data/"
+ )
warning_texts.append(fetch_error_text)
traceback_texts.append(fetch_error_text)
# Oracle has a file in /etc/cloud/cloud.cfg.d that contains
@@ -59,7 +59,7 @@ def verify_clean_log(log):
# ssh_redirect_user: true
# This can trigger a warning about opc having no public key
warning_texts.append(
- 'Unable to disable SSH logins for opc given ssh_redirect_user'
+ "Unable to disable SSH logins for opc given ssh_redirect_user"
)
for warning_text in warning_texts:
@@ -82,7 +82,7 @@ def emit_dots_on_travis():
It should be wrapped selectively around operations that are known to take a
long time.
"""
- if os.environ.get('TRAVIS') != "true":
+ if os.environ.get("TRAVIS") != "true":
# If we aren't on Travis, don't do anything.
yield
return
@@ -100,9 +100,9 @@ def emit_dots_on_travis():
dot_process.terminate()
-def get_test_rsa_keypair(key_name: str = 'test1') -> key_pair:
- private_key_path = KEY_PATH / 'id_rsa.{}'.format(key_name)
- public_key_path = KEY_PATH / 'id_rsa.{}.pub'.format(key_name)
+def get_test_rsa_keypair(key_name: str = "test1") -> key_pair:
+ private_key_path = KEY_PATH / "id_rsa.{}".format(key_name)
+ public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name)
with public_key_path.open() as public_file:
public_key = public_file.read()
with private_key_path.open() as private_file:
@@ -121,6 +121,7 @@ def retry(*, tries: int = 30, delay: int = 1):
def try_something_that_may_not_be_ready():
...
"""
+
def _retry(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
@@ -135,5 +136,7 @@ def retry(*, tries: int = 30, delay: int = 1):
else:
if last_error:
raise last_error
+
return wrapper
+
return _retry