summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration_tests/modules')
-rw-r--r--tests/integration_tests/modules/test_apt.py88
-rw-r--r--tests/integration_tests/modules/test_ca_certs.py1
-rw-r--r--tests/integration_tests/modules/test_cli.py9
-rw-r--r--tests/integration_tests/modules/test_combined.py155
-rw-r--r--tests/integration_tests/modules/test_command_output.py5
-rw-r--r--tests/integration_tests/modules/test_disk_setup.py76
-rw-r--r--tests/integration_tests/modules/test_growpart.py38
-rw-r--r--tests/integration_tests/modules/test_hotplug.py55
-rw-r--r--tests/integration_tests/modules/test_jinja_templating.py11
-rw-r--r--tests/integration_tests/modules/test_keys_to_console.py9
-rw-r--r--tests/integration_tests/modules/test_lxd_bridge.py2
-rw-r--r--tests/integration_tests/modules/test_ntp_servers.py30
-rw-r--r--tests/integration_tests/modules/test_package_update_upgrade_install.py18
-rw-r--r--tests/integration_tests/modules/test_persistence.py26
-rw-r--r--tests/integration_tests/modules/test_power_state_change.py48
-rw-r--r--tests/integration_tests/modules/test_puppet.py6
-rw-r--r--tests/integration_tests/modules/test_set_hostname.py10
-rw-r--r--tests/integration_tests/modules/test_set_password.py15
-rw-r--r--tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py13
-rw-r--r--tests/integration_tests/modules/test_ssh_generate.py16
-rw-r--r--tests/integration_tests/modules/test_ssh_keys_provided.py58
-rw-r--r--tests/integration_tests/modules/test_ssh_keysfile.py159
-rw-r--r--tests/integration_tests/modules/test_user_events.py50
-rw-r--r--tests/integration_tests/modules/test_users_groups.py21
-rw-r--r--tests/integration_tests/modules/test_version_change.py45
-rw-r--r--tests/integration_tests/modules/test_write_files.py32
26 files changed, 524 insertions, 472 deletions
diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py
index f5f6c813..48f398d1 100644
--- a/tests/integration_tests/modules/test_apt.py
+++ b/tests/integration_tests/modules/test_apt.py
@@ -3,12 +3,11 @@ import re
import pytest
-from cloudinit.config import cc_apt_configure
from cloudinit import gpg
+from cloudinit.config import cc_apt_configure
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
apt:
@@ -104,14 +103,15 @@ class TestApt:
"""Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg
in human readable format. Mimics the output of apt-key finger
"""
- list_cmd = ' '.join(gpg.GPG_LIST) + ' '
+ list_cmd = " ".join(gpg.GPG_LIST) + " "
keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS)
print(keys)
files = class_client.execute(
- 'ls ' + cc_apt_configure.APT_TRUSTED_GPG_DIR)
+ "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR
+ )
for file in files.split():
path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file
- keys += class_client.execute(list_cmd + path) or ''
+ keys += class_client.execute(list_cmd + path) or ""
return keys
def test_sources_list(self, class_client: IntegrationInstance):
@@ -124,8 +124,8 @@ class TestApt:
(This is ported from
`tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.)
"""
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
- assert 6 == len(sources_list.rstrip().split('\n'))
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert 6 == len(sources_list.rstrip().split("\n"))
for expected_re in EXPECTED_REGEXES:
assert re.search(expected_re, sources_list) is not None
@@ -136,7 +136,7 @@ class TestApt:
Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py
"""
apt_config = class_client.read_from_file(
- '/etc/apt/apt.conf.d/94cloud-init-config'
+ "/etc/apt/apt.conf.d/94cloud-init-config"
)
assert 'Assume-Yes "true";' in apt_config
assert 'Fix-Broken "true";' in apt_config
@@ -149,40 +149,43 @@ class TestApt:
"""
release = ImageSpecification.from_os_image().release
ppa_path_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/'
- 'simplestreams-dev-ubuntu-trunk-{}.list'.format(release)
+ "/etc/apt/sources.list.d/"
+ "simplestreams-dev-ubuntu-trunk-{}.list".format(release)
)
assert (
- 'http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu'
- ) in ppa_path_contents
+ "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu"
+ in ppa_path_contents
+ )
assert TEST_PPA_KEY in self.get_keys(class_client)
def test_signed_by(self, class_client: IntegrationInstance):
- """Test the apt signed-by functionality.
- """
+ """Test the apt signed-by functionality."""
release = ImageSpecification.from_os_image().release
source = (
"deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] "
"http://ppa.launchpad.net/juju/stable/ubuntu"
- " {} main".format(release))
+ " {} main".format(release)
+ )
path_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_signed_by.list')
+ "/etc/apt/sources.list.d/test_signed_by.list"
+ )
assert path_contents == source
key = class_client.execute(
- 'gpg --no-default-keyring --with-fingerprint --list-keys '
- '--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg')
+ "gpg --no-default-keyring --with-fingerprint --list-keys "
+ "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg"
+ )
assert TEST_SIGNED_BY_KEY in key
def test_bad_key(self, class_client: IntegrationInstance):
- """Test the apt signed-by functionality.
- """
+ """Test the apt signed-by functionality."""
with pytest.raises(OSError):
class_client.read_from_file(
- '/etc/apt/trusted.list.d/test_bad_key.gpg')
+ "/etc/apt/trusted.list.d/test_bad_key.gpg"
+ )
def test_key(self, class_client: IntegrationInstance):
"""Test the apt key functionality.
@@ -191,12 +194,13 @@ class TestApt:
tests/cloud_tests/testcases/modules/apt_configure_sources_key.py
"""
test_archive_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_key.list'
+ "/etc/apt/sources.list.d/test_key.list"
)
assert (
- 'http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu'
- ) in test_archive_contents
+ "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu"
+ in test_archive_contents
+ )
assert TEST_KEY in self.get_keys(class_client)
def test_keyserver(self, class_client: IntegrationInstance):
@@ -206,12 +210,13 @@ class TestApt:
tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py
"""
test_keyserver_contents = class_client.read_from_file(
- '/etc/apt/sources.list.d/test_keyserver.list'
+ "/etc/apt/sources.list.d/test_keyserver.list"
)
assert (
- 'http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu'
- ) in test_keyserver_contents
+ "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu"
+ in test_keyserver_contents
+ )
assert TEST_KEYSERVER_KEY in self.get_keys(class_client)
@@ -221,7 +226,7 @@ class TestApt:
Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py
"""
conf_exists = class_client.execute(
- 'test -f /etc/apt/apt.conf.d/90cloud-init-pipelining'
+ "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining"
).ok
assert conf_exists is False
@@ -237,7 +242,7 @@ apt:
- arches:
- default
"""
-DEFAULT_DATA = _DEFAULT_DATA.format(uri='')
+DEFAULT_DATA = _DEFAULT_DATA.format(uri="")
@pytest.mark.ubuntu
@@ -249,9 +254,9 @@ class TestDefaults:
When no uri is provided.
"""
- zone = class_client.execute('cloud-init query v1.availability_zone')
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
- assert '{}.clouds.archive.ubuntu.com'.format(zone) in sources_list
+ zone = class_client.execute("cloud-init query v1.availability_zone")
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list
def test_security(self, class_client: IntegrationInstance):
"""Test apt default security sources.
@@ -259,12 +264,12 @@ class TestDefaults:
Ported from
tests/cloud_tests/testcases/modules/apt_configure_security.py
"""
- sources_list = class_client.read_from_file('/etc/apt/sources.list')
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
# 3 lines from main, universe, and multiverse
- assert 3 == sources_list.count('deb http://security.ubuntu.com/ubuntu')
+ assert 3 == sources_list.count("deb http://security.ubuntu.com/ubuntu")
assert 3 == sources_list.count(
- '# deb-src http://security.ubuntu.com/ubuntu'
+ "# deb-src http://security.ubuntu.com/ubuntu"
)
@@ -280,10 +285,10 @@ def test_default_primary_with_uri(client: IntegrationInstance):
Ported from
tests/cloud_tests/testcases/modules/apt_configure_primary.py
"""
- sources_list = client.read_from_file('/etc/apt/sources.list')
- assert 'archive.ubuntu.com' not in sources_list
+ sources_list = client.read_from_file("/etc/apt/sources.list")
+ assert "archive.ubuntu.com" not in sources_list
- assert 'something.random.invalid' in sources_list
+ assert "something.random.invalid" in sources_list
DISABLED_DATA = """\
@@ -310,7 +315,7 @@ class TestDisabled:
sources_list = class_client.execute(
"cat /etc/apt/sources.list | grep -v '^#'"
).strip()
- assert '' == sources_list
+ assert "" == sources_list
def test_disable_apt_pipelining(self, class_client: IntegrationInstance):
"""Test disabling of apt pipelining.
@@ -319,7 +324,7 @@ class TestDisabled:
tests/cloud_tests/testcases/modules/apt_pipelining_disable.py
"""
conf = class_client.read_from_file(
- '/etc/apt/apt.conf.d/90cloud-init-pipelining'
+ "/etc/apt/apt.conf.d/90cloud-init-pipelining"
)
assert 'Acquire::http::Pipeline-Depth "0";' in conf
@@ -338,8 +343,7 @@ apt:
@pytest.mark.user_data(APT_PROXY_DATA)
def test_apt_proxy(client: IntegrationInstance):
"""Test the apt proxy data gets written correctly."""
- out = client.read_from_file(
- '/etc/apt/apt.conf.d/90cloud-init-aptproxy')
+ out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy")
assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out
assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out
assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out
diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py
index 89c01a9c..d514fc62 100644
--- a/tests/integration_tests/modules/test_ca_certs.py
+++ b/tests/integration_tests/modules/test_ca_certs.py
@@ -10,7 +10,6 @@ import os.path
import pytest
-
USER_DATA = """\
#cloud-config
ca-certs:
diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py
index 3f41b34d..97bfe52d 100644
--- a/tests/integration_tests/modules/test_cli.py
+++ b/tests/integration_tests/modules/test_cli.py
@@ -7,7 +7,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
-
VALID_USER_DATA = """\
#cloud-config
runcmd:
@@ -27,9 +26,9 @@ def test_valid_userdata(client: IntegrationInstance):
PR #575
"""
- result = client.execute('cloud-init devel schema --system')
+ result = client.execute("cloud-init devel schema --system")
assert result.ok
- assert 'Valid cloud-config: system userdata' == result.stdout.strip()
+ assert "Valid cloud-config: system userdata" == result.stdout.strip()
@pytest.mark.sru_2020_11
@@ -39,7 +38,7 @@ def test_invalid_userdata(client: IntegrationInstance):
PR #575
"""
- result = client.execute('cloud-init devel schema --system')
+ result = client.execute("cloud-init devel schema --system")
assert not result.ok
- assert 'Cloud config schema errors' in result.stderr
+ assert "Cloud config schema errors" in result.stderr
assert 'needs to begin with "#cloud-config"' in result.stderr
diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py
index 26a8397d..c88f40d3 100644
--- a/tests/integration_tests/modules/test_combined.py
+++ b/tests/integration_tests/modules/test_combined.py
@@ -6,9 +6,10 @@ the same instance launch. Most independent module coherence tests can go
here.
"""
import json
-import pytest
import re
+import pytest
+
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import (
@@ -76,7 +77,7 @@ class TestCombined:
Also tests LP 1511485: final_message is silent.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
expected = (
"This is my final message!\n"
r"\d+\.\d+.*\n"
@@ -94,10 +95,10 @@ class TestCombined:
configuring the archives.
"""
client = class_client
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'W: Failed to fetch' not in log
- assert 'W: Some index files failed to download' not in log
- assert 'E: Unable to locate package ntp' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "W: Failed to fetch" not in log
+ assert "W: Some index files failed to download" not in log
+ assert "E: Unable to locate package ntp" not in log
def test_byobu(self, class_client: IntegrationInstance):
"""Test byobu configured as enabled by default."""
@@ -107,22 +108,18 @@ class TestCombined:
def test_configured_locale(self, class_client: IntegrationInstance):
"""Test locale can be configured correctly."""
client = class_client
- default_locale = client.read_from_file('/etc/default/locale')
- assert 'LANG=en_GB.UTF-8' in default_locale
+ default_locale = client.read_from_file("/etc/default/locale")
+ assert "LANG=en_GB.UTF-8" in default_locale
- locale_a = client.execute('locale -a')
- verify_ordered_items_in_text([
- 'en_GB.utf8',
- 'en_US.utf8'
- ], locale_a)
+ locale_a = client.execute("locale -a")
+ verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a)
locale_gen = client.execute(
"cat /etc/locale.gen | grep -v '^#' | uniq"
)
- verify_ordered_items_in_text([
- 'en_GB.UTF-8',
- 'en_US.UTF-8'
- ], locale_gen)
+ verify_ordered_items_in_text(
+ ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen
+ )
def test_random_seed_data(self, class_client: IntegrationInstance):
"""Integration test for the random seed module.
@@ -141,12 +138,12 @@ class TestCombined:
def test_rsyslog(self, class_client: IntegrationInstance):
"""Test rsyslog is configured correctly."""
client = class_client
- assert 'My test log' in client.read_from_file('/var/tmp/rsyslog.log')
+ assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log")
def test_runcmd(self, class_client: IntegrationInstance):
"""Test runcmd works as expected"""
client = class_client
- assert 'hello world' == client.read_from_file('/var/tmp/runcmd_output')
+ assert "hello world" == client.read_from_file("/var/tmp/runcmd_output")
@retry(tries=30, delay=1)
def test_ssh_import_id(self, class_client: IntegrationInstance):
@@ -160,11 +157,10 @@ class TestCombined:
/home/ubuntu; this will need modification to run on other OSes.
"""
client = class_client
- ssh_output = client.read_from_file(
- "/home/ubuntu/.ssh/authorized_keys")
+ ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys")
- assert '# ssh-import-id gh:powersj' in ssh_output
- assert '# ssh-import-id lp:smoser' in ssh_output
+ assert "# ssh-import-id gh:powersj" in ssh_output
+ assert "# ssh-import-id lp:smoser" in ssh_output
def test_snap(self, class_client: IntegrationInstance):
"""Integration test for the snap module.
@@ -185,21 +181,22 @@ class TestCombined:
"""
client = class_client
timezone_output = client.execute(
- 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"')
+ 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"'
+ )
assert timezone_output.strip() == "HDT"
def test_no_problems(self, class_client: IntegrationInstance):
"""Test no errors, warnings, or tracebacks"""
client = class_client
- status_file = client.read_from_file('/run/cloud-init/status.json')
- status_json = json.loads(status_file)['v1']
- for stage in ('init', 'init-local', 'modules-config', 'modules-final'):
- assert status_json[stage]['errors'] == []
- result_file = client.read_from_file('/run/cloud-init/result.json')
- result_json = json.loads(result_file)['v1']
- assert result_json['errors'] == []
-
- log = client.read_from_file('/var/log/cloud-init.log')
+ status_file = client.read_from_file("/run/cloud-init/status.json")
+ status_json = json.loads(status_file)["v1"]
+ for stage in ("init", "init-local", "modules-config", "modules-final"):
+ assert status_json[stage]["errors"] == []
+ result_file = client.read_from_file("/run/cloud-init/result.json")
+ result_json = json.loads(result_file)["v1"]
+ assert result_json["errors"] == []
+
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
def test_correct_datasource_detected(
@@ -228,73 +225,81 @@ class TestCombined:
)
def _check_common_metadata(self, data):
- assert data['base64_encoded_keys'] == []
- assert data['merged_cfg'] == 'redacted for non-root user'
+ assert data["base64_encoded_keys"] == []
+ assert data["merged_cfg"] == "redacted for non-root user"
image_spec = ImageSpecification.from_os_image()
- assert data['sys_info']['dist'][0] == image_spec.os
+ assert data["sys_info"]["dist"][0] == image_spec.os
- v1_data = data['v1']
- assert re.match(r'\d\.\d+\.\d+-\d+', v1_data['kernel_release'])
- assert v1_data['variant'] == image_spec.os
- assert v1_data['distro'] == image_spec.os
- assert v1_data['distro_release'] == image_spec.release
- assert v1_data['machine'] == 'x86_64'
- assert re.match(r'3.\d\.\d', v1_data['python_version'])
+ v1_data = data["v1"]
+ assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"])
+ assert v1_data["variant"] == image_spec.os
+ assert v1_data["distro"] == image_spec.os
+ assert v1_data["distro_release"] == image_spec.release
+ assert v1_data["machine"] == "x86_64"
+ assert re.match(r"3.\d\.\d", v1_data["python_version"])
@pytest.mark.lxd_container
def test_instance_json_lxd(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert v1_data['subplatform'] == (
- 'seed-dir (/var/lib/cloud/seed/nocloud-net)')
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert (
+ v1_data["subplatform"]
+ == "seed-dir (/var/lib/cloud/seed/nocloud-net)"
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.lxd_vm
def test_instance_json_lxd_vm(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
self._check_common_metadata(data)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'unknown'
- assert v1_data['platform'] == 'lxd'
- assert any([
- '/var/lib/cloud/seed/nocloud-net' in v1_data['subplatform'],
- '/dev/sr0' in v1_data['subplatform']
- ])
- assert v1_data['availability_zone'] is None
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'] == client.instance.name
- assert v1_data['region'] is None
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert any(
+ [
+ "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"],
+ "/dev/sr0" in v1_data["subplatform"],
+ ]
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
@pytest.mark.ec2
def test_instance_json_ec2(self, class_client: IntegrationInstance):
client = class_client
instance_json_file = client.read_from_file(
- '/run/cloud-init/instance-data.json')
+ "/run/cloud-init/instance-data.json"
+ )
data = json.loads(instance_json_file)
- v1_data = data['v1']
- assert v1_data['cloud_name'] == 'aws'
- assert v1_data['platform'] == 'ec2'
- assert v1_data['subplatform'].startswith('metadata')
- assert v1_data[
- 'availability_zone'] == client.instance.availability_zone
- assert v1_data['instance_id'] == client.instance.name
- assert v1_data['local_hostname'].startswith('ip-')
- assert v1_data['region'] == client.cloud.cloud_instance.region
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "aws"
+ assert v1_data["platform"] == "ec2"
+ assert v1_data["subplatform"].startswith("metadata")
+ assert (
+ v1_data["availability_zone"] == client.instance.availability_zone
+ )
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"].startswith("ip-")
+ assert v1_data["region"] == client.cloud.cloud_instance.region
@pytest.mark.gce
def test_instance_json_gce(self, class_client: IntegrationInstance):
diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py
index 8429873f..96525cac 100644
--- a/tests/integration_tests/modules/test_command_output.py
+++ b/tests/integration_tests/modules/test_command_output.py
@@ -8,7 +8,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
output: { all: "| tee -a /var/log/cloud-init-test-output" }
@@ -18,5 +17,5 @@ final_message: "should be last line in cloud-init-test-output file"
@pytest.mark.user_data(USER_DATA)
def test_runcmd(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init-test-output')
- assert 'should be last line in cloud-init-test-output file' in log
+ log = client.read_from_file("/var/log/cloud-init-test-output")
+ assert "should be last line in cloud-init-test-output file" in log
diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py
index 9c9edc46..22277331 100644
--- a/tests/integration_tests/modules/test_disk_setup.py
+++ b/tests/integration_tests/modules/test_disk_setup.py
@@ -1,25 +1,29 @@
import json
import os
-import pytest
from uuid import uuid4
+
+import pytest
from pycloudlib.lxd.instance import LXDInstance
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_clean_log
-DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4())
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
def setup_and_mount_lxd_disk(instance: LXDInstance):
- subp('lxc config device add {} test-disk-setup-disk disk source={}'.format(
- instance.name, DISK_PATH).split())
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
@pytest.yield_fixture
def create_disk():
# 640k should be enough for anybody
- subp('dd if=/dev/zero of={} bs=1k count=640'.format(DISK_PATH).split())
+ subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split())
yield
os.remove(DISK_PATH)
@@ -54,21 +58,21 @@ class TestDeviceAliases:
"""Test devices aliases work on disk setup/mount"""
def test_device_alias(self, create_disk, client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert (
- "updated disk_setup device entry 'my_alias' to '/dev/sdb'"
- ) in log
- assert 'changed my_alias.1 => /dev/sdb1' in log
- assert 'changed my_alias.2 => /dev/sdb2' in log
+ "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log
+ )
+ assert "changed my_alias.1 => /dev/sdb1" in log
+ assert "changed my_alias.2 => /dev/sdb2" in log
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 2
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt1'
- assert sdb['children'][1]['name'] == 'sdb2'
- assert sdb['children'][1]['mountpoint'] == '/mnt2'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
PARTPROBE_USERDATA = """\
@@ -121,13 +125,13 @@ class TestPartProbeAvailability:
def _verify_first_disk_setup(self, client, log):
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 2
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt1'
- assert sdb['children'][1]['name'] == 'sdb2'
- assert sdb['children'][1]['mountpoint'] == '/mnt2'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
# Not bionic or xenial because the LXD agent gets in the way of us
# changing the userdata
@@ -148,13 +152,13 @@ class TestPartProbeAvailability:
with a warning and a traceback. When partprobe is in use, everything
should work successfully.
"""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
self._verify_first_disk_setup(client, log)
# Update our userdata and cloud.cfg to mount then perform new disk
# setup
client.write_to_file(
- '/var/lib/cloud/seed/nocloud-net/user-data',
+ "/var/lib/cloud/seed/nocloud-net/user-data",
UPDATED_PARTPROBE_USERDATA,
)
client.execute(
@@ -162,17 +166,17 @@ class TestPartProbeAvailability:
"/etc/cloud/cloud.cfg"
)
- client.execute('cloud-init clean --logs')
+ client.execute("cloud-init clean --logs")
client.restart()
# Assert new setup works as expected
verify_clean_log(log)
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 1
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['children'][0]['mountpoint'] == '/mnt3'
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][0]["mountpoint"] == "/mnt3"
def test_disk_setup_no_partprobe(
self, create_disk, client: IntegrationInstance
@@ -180,11 +184,11 @@ class TestPartProbeAvailability:
"""Ensure disk setup still works as expected without partprobe."""
# We can't do this part in a bootcmd because the path has already
# been found by the time we get to the bootcmd
- client.execute('rm $(which partprobe)')
- client.execute('cloud-init clean --logs')
+ client.execute("rm $(which partprobe)")
+ client.execute("cloud-init clean --logs")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
self._verify_first_disk_setup(client, log)
- assert 'partprobe' not in log
+ assert "partprobe" not in log
diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py
index af1e3a15..67251817 100644
--- a/tests/integration_tests/modules/test_growpart.py
+++ b/tests/integration_tests/modules/test_growpart.py
@@ -1,22 +1,26 @@
+import json
import os
-import pytest
import pathlib
-import json
from uuid import uuid4
+
+import pytest
from pycloudlib.lxd.instance import LXDInstance
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
-DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4())
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
def setup_and_mount_lxd_disk(instance: LXDInstance):
- subp('lxc config device add {} test-disk-setup-disk disk source={}'.format(
- instance.name, DISK_PATH).split())
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
-@pytest.fixture(scope='class', autouse=True)
+@pytest.fixture(scope="class", autouse=True)
def create_disk():
"""Create 16M sparse file"""
pathlib.Path(DISK_PATH).touch()
@@ -50,13 +54,15 @@ class TestGrowPart:
"""Test growpart"""
def test_grow_part(self, client: IntegrationInstance):
- """Verify """
- log = client.read_from_file('/var/log/cloud-init.log')
- assert ("cc_growpart.py[INFO]: '/dev/sdb1' resized:"
- " changed (/dev/sdb, 1) from") in log
-
- lsblk = json.loads(client.execute('lsblk --json'))
- sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0]
- assert len(sdb['children']) == 1
- assert sdb['children'][0]['name'] == 'sdb1'
- assert sdb['size'] == '16M'
+ """Verify"""
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert (
+ "cc_growpart.py[INFO]: '/dev/sdb1' resized:"
+ " changed (/dev/sdb, 1) from" in log
+ )
+
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["size"] == "16M"
diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py
index f5abc86f..0bad761e 100644
--- a/tests/integration_tests/modules/test_hotplug.py
+++ b/tests/integration_tests/modules/test_hotplug.py
@@ -1,8 +1,9 @@
-import pytest
import time
-import yaml
from collections import namedtuple
+import pytest
+import yaml
+
from tests.integration_tests.instances import IntegrationInstance
USER_DATA = """\
@@ -12,28 +13,28 @@ updates:
when: ['hotplug']
"""
-ip_addr = namedtuple('ip_addr', 'interface state ip4 ip6')
+ip_addr = namedtuple("ip_addr", "interface state ip4 ip6")
def _wait_till_hotplug_complete(client, expected_runs=1):
for _ in range(60):
- log = client.read_from_file('/var/log/cloud-init.log')
- if log.count('Exiting hotplug handler') == expected_runs:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if log.count("Exiting hotplug handler") == expected_runs:
return log
time.sleep(1)
- raise Exception('Waiting for hotplug handler failed')
+ raise Exception("Waiting for hotplug handler failed")
def _get_ip_addr(client):
ips = []
- lines = client.execute('ip --brief addr').split('\n')
+ lines = client.execute("ip --brief addr").split("\n")
for line in lines:
attributes = line.split()
interface, state = attributes[0], attributes[1]
ip4_cidr = attributes[2] if len(attributes) > 2 else None
ip6_cidr = attributes[3] if len(attributes) > 3 else None
- ip4 = ip4_cidr.split('/')[0] if ip4_cidr else None
- ip6 = ip6_cidr.split('/')[0] if ip6_cidr else None
+ ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None
+ ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None
ip = ip_addr(interface, state, ip4, ip6)
ips.append(ip)
return ips
@@ -47,10 +48,10 @@ def _get_ip_addr(client):
@pytest.mark.user_data(USER_DATA)
def test_hotplug_add_remove(client: IntegrationInstance):
ips_before = _get_ip_addr(client)
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Exiting hotplug handler' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
assert client.execute(
- 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules'
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
).ok
# Add new NIC
@@ -62,11 +63,11 @@ def test_hotplug_add_remove(client: IntegrationInstance):
assert len(ips_after_add) == len(ips_before) + 1
assert added_ip not in [ip.ip4 for ip in ips_before]
assert added_ip in [ip.ip4 for ip in ips_after_add]
- assert new_addition.state == 'UP'
+ assert new_addition.state == "UP"
- netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
config = yaml.safe_load(netplan_cfg)
- assert new_addition.interface in config['network']['ethernets']
+ assert new_addition.interface in config["network"]["ethernets"]
# Remove new NIC
client.instance.remove_network_interface(added_ip)
@@ -75,37 +76,37 @@ def test_hotplug_add_remove(client: IntegrationInstance):
assert len(ips_after_remove) == len(ips_before)
assert added_ip not in [ip.ip4 for ip in ips_after_remove]
- netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml')
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
config = yaml.safe_load(netplan_cfg)
- assert new_addition.interface not in config['network']['ethernets']
+ assert new_addition.interface not in config["network"]["ethernets"]
- assert 'enabled' == client.execute(
- 'cloud-init devel hotplug-hook -s net query'
+ assert "enabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
)
@pytest.mark.openstack
def test_no_hotplug_in_userdata(client: IntegrationInstance):
ips_before = _get_ip_addr(client)
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Exiting hotplug handler' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
assert client.execute(
- 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules'
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
).failed
# Add new NIC
client.instance.add_network_interface()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'hotplug-hook' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "hotplug-hook" not in log
ips_after_add = _get_ip_addr(client)
if len(ips_after_add) == len(ips_before) + 1:
# We can see the device, but it should not have been brought up
new_ip = [ip for ip in ips_after_add if ip not in ips_before][0]
- assert new_ip.state == 'DOWN'
+ assert new_ip.state == "DOWN"
else:
assert len(ips_after_add) == len(ips_before)
- assert 'disabled' == client.execute(
- 'cloud-init devel hotplug-hook -s net query'
+ assert "disabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
)
diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py
index fe8eff1a..7788c6f0 100644
--- a/tests/integration_tests/modules/test_jinja_templating.py
+++ b/tests/integration_tests/modules/test_jinja_templating.py
@@ -4,7 +4,6 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import verify_ordered_items_in_text
-
USER_DATA = """\
## template: jinja
#cloud-config
@@ -23,12 +22,12 @@ def test_runcmd_with_variable_substitution(client: IntegrationInstance):
we can also substitute variables from instance-data-sensitive
LP: #1931392.
"""
- hostname = client.execute('hostname').stdout.strip()
+ hostname = client.execute("hostname").stdout.strip()
expected = [
hostname,
- ('Merged cloud-init system config from /etc/cloud/cloud.cfg and '
- '/etc/cloud/cloud.cfg.d/'),
- hostname
+ "Merged cloud-init system config from /etc/cloud/cloud.cfg and "
+ "/etc/cloud/cloud.cfg.d/",
+ hostname,
]
- output = client.read_from_file('/var/tmp/runcmd_output')
+ output = client.read_from_file("/var/tmp/runcmd_output")
verify_ordered_items_in_text(expected, output)
diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py
index e79db3c7..50899982 100644
--- a/tests/integration_tests/modules/test_keys_to_console.py
+++ b/tests/integration_tests/modules/test_keys_to_console.py
@@ -36,6 +36,7 @@ users:
@pytest.mark.user_data(BLACKLIST_USER_DATA)
class TestKeysToConsoleBlacklist:
"""Test that the blacklist options work as expected."""
+
@pytest.mark.parametrize("key_type", ["DSA", "ECDSA"])
def test_excluded_keys(self, class_client, key_type):
syslog = class_client.read_from_file("/var/log/syslog")
@@ -55,6 +56,7 @@ class TestAllKeysToConsoleBlacklist:
"""Test that when key blacklist contains all key types that
no header/footer are output.
"""
+
def test_header_excluded(self, class_client):
syslog = class_client.read_from_file("/var/log/syslog")
assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog
@@ -67,6 +69,7 @@ class TestAllKeysToConsoleBlacklist:
@pytest.mark.user_data(DISABLED_USER_DATA)
class TestKeysToConsoleDisabled:
"""Test that output can be fully disabled."""
+
@pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"])
def test_keys_excluded(self, class_client, key_type):
syslog = class_client.read_from_file("/var/log/syslog")
@@ -90,7 +93,7 @@ class TestKeysToConsoleEnabled:
"""Test that output can be enabled disabled."""
def test_duplicate_messaging_console_log(self, class_client):
- class_client.execute('cloud-init status --wait --long').ok
+ class_client.execute("cloud-init status --wait --long").ok
try:
console_log = class_client.instance.console_log()
except NotImplementedError:
@@ -98,13 +101,13 @@ class TestKeysToConsoleEnabled:
# log
pytest.skip("NotImplementedError when requesting console log")
return
- if console_log.lower() == 'no console output':
+ if console_log.lower() == "no console output":
# This test retries because we might not have the full console log
# on the first fetch. However, if we have no console output
# at all, we don't want to keep retrying as that would trigger
# another 5 minute wait on the pycloudlib side, which could
# leave us waiting for a couple hours
- pytest.fail('no console output')
+ pytest.fail("no console output")
return
msg = "no authorized SSH keys fingerprints found for user barfoo."
assert 1 == console_log.count(msg)
diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py
index 65dce3c7..2cb3f4f3 100644
--- a/tests/integration_tests/modules/test_lxd_bridge.py
+++ b/tests/integration_tests/modules/test_lxd_bridge.py
@@ -8,7 +8,6 @@ import yaml
from tests.integration_tests.util import verify_clean_log
-
USER_DATA = """\
#cloud-config
lxd:
@@ -29,7 +28,6 @@ lxd:
@pytest.mark.no_container
@pytest.mark.user_data(USER_DATA)
class TestLxdBridge:
-
@pytest.mark.parametrize("binary_name", ["lxc", "lxd"])
def test_binaries_installed(self, class_client, binary_name):
"""Check that the expected LXD binaries are installed"""
diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py
index c777a641..fc62e63b 100644
--- a/tests/integration_tests/modules/test_ntp_servers.py
+++ b/tests/integration_tests/modules/test_ntp_servers.py
@@ -9,8 +9,8 @@ and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``)
"""
import re
-import yaml
import pytest
+import yaml
from tests.integration_tests.instances import IntegrationInstance
@@ -33,13 +33,13 @@ EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"]
@pytest.mark.user_data(USER_DATA)
class TestNtpServers:
-
def test_ntp_installed(self, class_client: IntegrationInstance):
"""Test that `ntpd --version` succeeds, indicating installation."""
assert class_client.execute("ntpd --version").ok
- def test_dist_config_file_is_empty(self,
- class_client: IntegrationInstance):
+ def test_dist_config_file_is_empty(
+ self, class_client: IntegrationInstance
+ ):
"""Test that the distributed config file is empty.
(This test is skipped on all currently supported Ubuntu releases, so
@@ -56,13 +56,13 @@ class TestNtpServers:
assert re.search(
r"^server {} iburst".format(expected_server),
ntp_conf,
- re.MULTILINE
+ re.MULTILINE,
)
for expected_pool in EXPECTED_POOLS:
assert re.search(
r"^pool {} iburst".format(expected_pool),
ntp_conf,
- re.MULTILINE
+ re.MULTILINE,
)
def test_ntpq_servers(self, class_client: IntegrationInstance):
@@ -84,12 +84,12 @@ ntp:
@pytest.mark.user_data(CHRONY_DATA)
def test_chrony(client: IntegrationInstance):
- if client.execute('test -f /etc/chrony.conf').ok:
- chrony_conf = '/etc/chrony.conf'
+ if client.execute("test -f /etc/chrony.conf").ok:
+ chrony_conf = "/etc/chrony.conf"
else:
- chrony_conf = '/etc/chrony/chrony.conf'
+ chrony_conf = "/etc/chrony/chrony.conf"
contents = client.read_from_file(chrony_conf)
- assert 'server 172.16.15.14' in contents
+ assert "server 172.16.15.14" in contents
TIMESYNCD_DATA = """\
@@ -105,9 +105,9 @@ ntp:
@pytest.mark.user_data(TIMESYNCD_DATA)
def test_timesyncd(client: IntegrationInstance):
contents = client.read_from_file(
- '/etc/systemd/timesyncd.conf.d/cloud-init.conf'
+ "/etc/systemd/timesyncd.conf.d/cloud-init.conf"
)
- assert 'NTP=172.16.15.14' in contents
+ assert "NTP=172.16.15.14" in contents
EMPTY_NTP = """\
@@ -121,8 +121,8 @@ ntp:
@pytest.mark.user_data(EMPTY_NTP)
def test_empty_ntp(client: IntegrationInstance):
- assert client.execute('ntpd --version').ok
- assert client.execute('test -f /etc/ntp.conf.dist').failed
- assert 'pool.ntp.org iburst' in client.execute(
+ assert client.execute("ntpd --version").ok
+ assert client.execute("test -f /etc/ntp.conf.dist").failed
+ assert "pool.ntp.org iburst" in client.execute(
'grep -v "^#" /etc/ntp.conf'
)
diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py
index 28d741bc..d668d81c 100644
--- a/tests/integration_tests/modules/test_package_update_upgrade_install.py
+++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py
@@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as
"""
import re
-import pytest
+import pytest
USER_DATA = """\
#cloud-config
@@ -29,7 +29,6 @@ package_upgrade: true
@pytest.mark.ubuntu
@pytest.mark.user_data(USER_DATA)
class TestPackageUpdateUpgradeInstall:
-
def assert_package_installed(self, pkg_out, name, version=None):
"""Check dpkg-query --show output for matching package name.
@@ -38,7 +37,8 @@ class TestPackageUpdateUpgradeInstall:
version.
"""
pkg_match = re.search(
- "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE)
+ "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE
+ )
if pkg_match:
installed_version = pkg_match.group("version")
if not version:
@@ -46,8 +46,10 @@ class TestPackageUpdateUpgradeInstall:
if installed_version.startswith(version):
return # Success
raise AssertionError(
- "Expected package version %s-%s not found. Found %s" %
- name, version, installed_version)
+ "Expected package version %s-%s not found. Found %s" % name,
+ version,
+ installed_version,
+ )
raise AssertionError("Package not installed: %s" % name)
def test_new_packages_are_installed(self, class_client):
@@ -58,11 +60,13 @@ class TestPackageUpdateUpgradeInstall:
def test_packages_were_updated(self, class_client):
out = class_client.execute(
- "grep ^Commandline: /var/log/apt/history.log")
+ "grep ^Commandline: /var/log/apt/history.log"
+ )
assert (
"Commandline: /usr/bin/apt-get --option=Dpkg::Options"
"::=--force-confold --option=Dpkg::options::=--force-unsafe-io "
- "--assume-yes --quiet install sl tree") in out
+ "--assume-yes --quiet install sl tree" in out
+ )
def test_packages_were_upgraded(self, class_client):
"""Test cloud-init-output for install & upgrade stuff."""
diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py
index 00fdeaea..33527e1e 100644
--- a/tests/integration_tests/modules/test_persistence.py
+++ b/tests/integration_tests/modules/test_persistence.py
@@ -10,21 +10,23 @@ from tests.integration_tests.util import (
verify_ordered_items_in_text,
)
-
-PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl')
-TEST_PICKLE = ASSETS_DIR / 'trusty_with_mime.pkl'
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl"
@pytest.mark.lxd_container
def test_log_message_on_missing_version_file(client: IntegrationInstance):
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.restart()
- assert client.execute('cloud-init status --wait').ok
- log = client.read_from_file('/var/log/cloud-init.log')
- verify_ordered_items_in_text([
- "Unable to unpickle datasource: 'MIMEMultipart' object has no "
- "attribute 'policy'. Ignoring current cache.",
- 'no cache found',
- 'Searching for local data source',
- 'SUCCESS: found local data from DataSourceNoCloud'
- ], log)
+ assert client.execute("cloud-init status --wait").ok
+ log = client.read_from_file("/var/log/cloud-init.log")
+ verify_ordered_items_in_text(
+ [
+ "Unable to unpickle datasource: 'MIMEMultipart' object has no "
+ "attribute 'policy'. Ignoring current cache.",
+ "no cache found",
+ "Searching for local data source",
+ "SUCCESS: found local data from DataSourceNoCloud",
+ ],
+ log,
+ )
diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py
index 5f3a32ac..a629029d 100644
--- a/tests/integration_tests/modules/test_power_state_change.py
+++ b/tests/integration_tests/modules/test_power_state_change.py
@@ -30,7 +30,7 @@ def _detect_reboot(instance: IntegrationInstance):
instance.instance.wait()
for _ in range(600):
try:
- log = instance.read_from_file('/var/log/cloud-init.log')
+ log = instance.read_from_file("/var/log/cloud-init.log")
boot_count = log.count("running 'init-local'")
if boot_count == 1:
instance.instance.wait()
@@ -40,11 +40,11 @@ def _detect_reboot(instance: IntegrationInstance):
pass
time.sleep(1)
else:
- raise Exception('Could not detect reboot')
+ raise Exception("Could not detect reboot")
def _can_connect(instance):
- return instance.execute('true').ok
+ return instance.execute("true").ok
# This test is marked unstable because even though it should be able to
@@ -55,36 +55,44 @@ def _can_connect(instance):
@pytest.mark.ubuntu
@pytest.mark.lxd_container
class TestPowerChange:
- @pytest.mark.parametrize('mode,delay,timeout,expected', [
- ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'),
- ('reboot', 'now', '0', 'will execute: shutdown -r now msg'),
- ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'),
- ])
- def test_poweroff(self, session_cloud: IntegrationCloud,
- mode, delay, timeout, expected):
+ @pytest.mark.parametrize(
+ "mode,delay,timeout,expected",
+ [
+ ("poweroff", "now", "10", "will execute: shutdown -P now msg"),
+ ("reboot", "now", "0", "will execute: shutdown -r now msg"),
+ ("halt", "+1", "0", "will execute: shutdown -H +1 msg"),
+ ],
+ )
+ def test_poweroff(
+ self, session_cloud: IntegrationCloud, mode, delay, timeout, expected
+ ):
with session_cloud.launch(
user_data=USER_DATA.format(
- delay=delay, mode=mode, timeout=timeout, condition='true'),
- launch_kwargs={'wait': False},
+ delay=delay, mode=mode, timeout=timeout, condition="true"
+ ),
+ launch_kwargs={"wait": False},
) as instance:
- if mode == 'reboot':
+ if mode == "reboot":
_detect_reboot(instance)
else:
instance.instance.wait_for_stop()
instance.instance.start(wait=True)
- log = instance.read_from_file('/var/log/cloud-init.log')
+ log = instance.read_from_file("/var/log/cloud-init.log")
assert _can_connect(instance)
lines_to_check = [
- 'Running module power-state-change',
+ "Running module power-state-change",
expected,
"running 'init-local'",
- 'config-power-state-change already ran',
+ "config-power-state-change already ran",
]
verify_ordered_items_in_text(lines_to_check, log)
- @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff',
- timeout='0', condition='false'))
+ @pytest.mark.user_data(
+ USER_DATA.format(
+ delay="0", mode="poweroff", timeout="0", condition="false"
+ )
+ )
def test_poweroff_false_condition(self, client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert _can_connect(client)
- assert 'Condition was false. Will not perform state change' in log
+ assert "Condition was false. Will not perform state change" in log
diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py
index f40a6ca3..1bd9cee4 100644
--- a/tests/integration_tests/modules/test_puppet.py
+++ b/tests/integration_tests/modules/test_puppet.py
@@ -15,9 +15,9 @@ puppet:
@pytest.mark.user_data(SERVICE_DATA)
def test_puppet_service(client: IntegrationInstance):
"""Basic test that puppet gets installed and runs."""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
verify_clean_log(log)
- assert client.execute('systemctl is-active puppet').ok
+ assert client.execute("systemctl is-active puppet").ok
assert "Running command ['puppet', 'agent'" not in log
@@ -35,5 +35,5 @@ puppet:
@pytest.mark.user_data(EXEC_DATA)
def test_pupet_exec(client: IntegrationInstance):
"""Basic test that puppet gets installed and runs."""
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
assert "Running command ['puppet', 'agent', '--noop']" in log
diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py
index e7f7f6b6..ae0aeae9 100644
--- a/tests/integration_tests/modules/test_set_hostname.py
+++ b/tests/integration_tests/modules/test_set_hostname.py
@@ -11,7 +11,6 @@ after the system is boot.
import pytest
-
USER_DATA_HOSTNAME = """\
#cloud-config
hostname: cloudinit2
@@ -34,7 +33,6 @@ fqdn: cloudinit2.test.io
@pytest.mark.ci
class TestHostname:
-
@pytest.mark.user_data(USER_DATA_HOSTNAME)
def test_hostname(self, client):
hostname_output = client.execute("hostname")
@@ -59,6 +57,8 @@ class TestHostname:
assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip()
host_output = client.execute("grep ^127 /etc/hosts")
- assert '127.0.1.1 {} {}'.format(
- fqdn_output, hostname_output) in host_output
- assert '127.0.0.1 localhost' in host_output
+ assert (
+ "127.0.1.1 {} {}".format(fqdn_output, hostname_output)
+ in host_output
+ )
+ assert "127.0.0.1 localhost" in host_output
diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py
index ac9db19d..e0f8b692 100644
--- a/tests/integration_tests/modules/test_set_password.py
+++ b/tests/integration_tests/modules/test_set_password.py
@@ -15,7 +15,6 @@ import yaml
from tests.integration_tests.util import retry
-
COMMON_USER_DATA = """\
#cloud-config
ssh_pwauth: yes
@@ -42,7 +41,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/"
lock_passwd: false
"""
-LIST_USER_DATA = COMMON_USER_DATA + """
+LIST_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list:
- tom:mypassword123!
@@ -50,8 +51,11 @@ chpasswd:
- harry:RANDOM
- mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
-STRING_USER_DATA = COMMON_USER_DATA + """
+STRING_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list: |
tom:mypassword123!
@@ -59,6 +63,7 @@ chpasswd:
harry:RANDOM
mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"]
USERS_PASSWD_VALUES = {
@@ -141,13 +146,13 @@ class Mixin:
# log
pytest.skip("NotImplementedError when requesting console log")
return
- if console_log.lower() == 'no console output':
+ if console_log.lower() == "no console output":
# This test retries because we might not have the full console log
# on the first fetch. However, if we have no console output
# at all, we don't want to keep retrying as that would trigger
# another 5 minute wait on the pycloudlib side, which could
# leave us waiting for a couple hours
- pytest.fail('no console output')
+ pytest.fail("no console output")
return
assert "dick:" in console_log
assert "harry:" in console_log
diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
index cf14d0b0..89b49576 100644
--- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
+++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
@@ -14,7 +14,6 @@ import pytest
from tests.integration_tests.util import retry
-
USER_DATA_SSH_AUTHKEY_DISABLE = """\
#cloud-config
no_ssh_fingerprints: true
@@ -32,13 +31,13 @@ ssh_authorized_keys:
@pytest.mark.ci
class TestSshAuthkeyFingerprints:
-
@pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE)
def test_ssh_authkey_fingerprints_disable(self, client):
cloudinit_output = client.read_from_file("/var/log/cloud-init.log")
assert (
"Skipping module named ssh-authkey-fingerprints, "
- "logging of SSH fingerprints disabled") in cloudinit_output
+ "logging of SSH fingerprints disabled" in cloudinit_output
+ )
# retry decorator here because it can take some time to be reflected
# in syslog
@@ -47,7 +46,7 @@ class TestSshAuthkeyFingerprints:
def test_ssh_authkey_fingerprints_enable(self, client):
syslog_output = client.read_from_file("/var/log/syslog")
- assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None
- assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None
- assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None
- assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None
+ assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None
+ assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None
+ assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None
+ assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None
diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py
index 60c36982..1dd0adf1 100644
--- a/tests/integration_tests/modules/test_ssh_generate.py
+++ b/tests/integration_tests/modules/test_ssh_generate.py
@@ -10,7 +10,6 @@ keys were created.
import pytest
-
USER_DATA = """\
#cloud-config
ssh_genkeytypes:
@@ -23,28 +22,27 @@ authkey_hash: sha512
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysGenerate:
-
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_dsa_key.pub",
"/etc/ssh/ssh_host_dsa_key",
"/etc/ssh/ssh_host_rsa_key.pub",
"/etc/ssh/ssh_host_rsa_key",
- )
+ ),
)
def test_ssh_keys_not_generated(self, ssh_key_path, class_client):
- out = class_client.execute(
- "test -e {}".format(ssh_key_path)
- )
+ out = class_client.execute("test -e {}".format(ssh_key_path))
assert out.failed
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_ecdsa_key.pub",
"/etc/ssh/ssh_host_ecdsa_key",
"/etc/ssh/ssh_host_ed25519_key.pub",
"/etc/ssh/ssh_host_ed25519_key",
- )
+ ),
)
def test_ssh_keys_generated(self, ssh_key_path, class_client):
out = class_client.read_from_file(ssh_key_path)
diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py
index 6aae96ae..b79f18eb 100644
--- a/tests/integration_tests/modules/test_ssh_keys_provided.py
+++ b/tests/integration_tests/modules/test_ssh_keys_provided.py
@@ -9,7 +9,6 @@ system.
import pytest
-
USER_DATA = """\
#cloud-config
disable_root: false
@@ -82,44 +81,33 @@ ssh_keys:
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysProvided:
-
@pytest.mark.parametrize(
"config_path,expected_out",
(
(
"/etc/ssh/ssh_host_dsa_key.pub",
- (
- "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
- "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM"
- ),
+ "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
+ "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM",
),
(
"/etc/ssh/ssh_host_dsa_key",
- (
- "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
- "hOVAfzZ6+jklP"
- ),
+ "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
+ "hOVAfzZ6+jklP",
),
(
"/etc/ssh/ssh_host_rsa_key.pub",
- (
- "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
- "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4"
- ),
+ "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
+ "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4",
),
(
"/etc/ssh/ssh_host_rsa_key",
- (
- "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
- "RQvLZpMRdywBm"
- ),
+ "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
+ "RQvLZpMRdywBm",
),
(
"/etc/ssh/ssh_host_rsa_key-cert.pub",
- (
- "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
- "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD"
- ),
+ "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
+ "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD",
),
(
"/etc/ssh/sshd_config",
@@ -127,33 +115,25 @@ class TestSshKeysProvided:
),
(
"/etc/ssh/ssh_host_ecdsa_key.pub",
- (
- "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
- "BBFsS5Tvky/IC/dXhE/afxxU"
- ),
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
+ "BBFsS5Tvky/IC/dXhE/afxxU",
),
(
"/etc/ssh/ssh_host_ecdsa_key",
- (
- "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
- "5mpZqxgX4vcgb"
- ),
+ "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
+ "5mpZqxgX4vcgb",
),
(
"/etc/ssh/ssh_host_ed25519_key.pub",
- (
- "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
- "G15dqjQ2XkNVOEnb5"
- ),
+ "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
+ "G15dqjQ2XkNVOEnb5",
),
(
"/etc/ssh/ssh_host_ed25519_key",
- (
- "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
- "OhteXao0Nl5DVThJ2+Q"
- ),
+ "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
+ "OhteXao0Nl5DVThJ2+Q",
),
- )
+ ),
)
def test_ssh_provided_keys(self, config_path, expected_out, class_client):
out = class_client.read_from_file(config_path).strip()
diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py
index b39454e6..8330a1ce 100644
--- a/tests/integration_tests/modules/test_ssh_keysfile.py
+++ b/tests/integration_tests/modules/test_ssh_keysfile.py
@@ -1,15 +1,16 @@
+from io import StringIO
+
import paramiko
import pytest
-from io import StringIO
from paramiko.ssh_exception import SSHException
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import get_test_rsa_keypair
-TEST_USER1_KEYS = get_test_rsa_keypair('test1')
-TEST_USER2_KEYS = get_test_rsa_keypair('test2')
-TEST_DEFAULT_KEYS = get_test_rsa_keypair('test3')
+TEST_USER1_KEYS = get_test_rsa_keypair("test1")
+TEST_USER2_KEYS = get_test_rsa_keypair("test2")
+TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3")
_USERDATA = """\
#cloud-config
@@ -26,7 +27,7 @@ users:
ssh_authorized_keys:
- {user2}
""".format(
- bootcmd='{bootcmd}',
+ bootcmd="{bootcmd}",
default=TEST_DEFAULT_KEYS.public_key,
user1=TEST_USER1_KEYS.public_key,
user2=TEST_USER2_KEYS.public_key,
@@ -37,9 +38,9 @@ def common_verify(client, expected_keys):
for user, filename, keys in expected_keys:
# Ensure key is in the key file
contents = client.read_from_file(filename)
- if user in ['ubuntu', 'root']:
- lines = contents.split('\n')
- if user == 'root':
+ if user in ["ubuntu", "root"]:
+ lines = contents.split("\n")
+ if user == "root":
# Our personal public key gets added by pycloudlib in
# addition to the default `ssh_authorized_keys`
assert len(lines) == 2
@@ -54,8 +55,9 @@ def common_verify(client, expected_keys):
# Ensure we can actually connect
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- paramiko_key = paramiko.RSAKey.from_private_key(StringIO(
- keys.private_key))
+ paramiko_key = paramiko.RSAKey.from_private_key(
+ StringIO(keys.private_key)
+ )
# Will fail with AuthenticationException if
# we cannot connect
@@ -71,8 +73,11 @@ def common_verify(client, expected_keys):
other_users = [u[0] for u in expected_keys if u[2] != keys]
for other_user in other_users:
with pytest.raises(SSHException):
- print('trying to connect as {} with key from {}'.format(
- other_user, user))
+ print(
+ "trying to connect as {} with key from {}".format(
+ other_user, user
+ )
+ )
ssh.connect(
client.instance.ip,
username=other_user,
@@ -83,37 +88,38 @@ def common_verify(client, expected_keys):
# Ensure we haven't messed with any /home permissions
# See LP: #1940233
- home_dir = '/home/{}'.format(user)
+ home_dir = "/home/{}".format(user)
# Home permissions aren't consistent between releases. On ubuntu
# this can change to 750 once focal is unsupported.
if ImageSpecification.from_os_image().release in ("bionic", "focal"):
- home_perms = '755'
+ home_perms = "755"
else:
- home_perms = '750'
- if user == 'root':
- home_dir = '/root'
- home_perms = '700'
- assert '{} {}'.format(user, home_perms) == client.execute(
+ home_perms = "750"
+ if user == "root":
+ home_dir = "/root"
+ home_perms = "700"
+ assert "{} {}".format(user, home_perms) == client.execute(
'stat -c "%U %a" {}'.format(home_dir)
)
if client.execute("test -d {}/.ssh".format(home_dir)).ok:
- assert '{} 700'.format(user) == client.execute(
+ assert "{} 700".format(user) == client.execute(
'stat -c "%U %a" {}/.ssh'.format(home_dir)
)
- assert '{} 600'.format(user) == client.execute(
+ assert "{} 600".format(user) == client.execute(
'stat -c "%U %a" {}'.format(filename)
)
# Also ensure ssh-keygen works as expected
- client.execute('mkdir {}/.ssh'.format(home_dir))
+ client.execute("mkdir {}/.ssh".format(home_dir))
assert client.execute(
"ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format(
- home_dir)
+ home_dir
+ )
).ok
- assert client.execute('test -f {}/.ssh/id_rsa'.format(home_dir))
- assert client.execute('test -f {}/.ssh/id_rsa.pub'.format(home_dir))
+ assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir))
+ assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir))
- assert 'root 755' == client.execute('stat -c "%U %a" /home')
+ assert "root 755" == client.execute('stat -c "%U %a" /home')
DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""')
@@ -123,75 +129,96 @@ DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""')
@pytest.mark.user_data(DEFAULT_KEYS_USERDATA)
def test_authorized_keys_default(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/.ssh/authorized_keys',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/.ssh/authorized_keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/.ssh/authorized_keys',
- TEST_DEFAULT_KEYS),
- ('root', '/root/.ssh/authorized_keys', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' "
- "/etc/ssh/sshd_config"))
+AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA)
def test_authorized_keys2(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/.ssh/authorized_keys2',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/.ssh/authorized_keys2',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/.ssh/authorized_keys2',
- TEST_DEFAULT_KEYS),
- ('root', '/root/.ssh/authorized_keys2', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys2",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys2",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-NESTED_KEYS_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' "
- "/etc/ssh/sshd_config"))
+NESTED_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(NESTED_KEYS_USERDATA)
def test_nested_keys(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/home/test_user1/foo/bar/ssh/keys',
- TEST_USER1_KEYS),
- ('test_user2', '/home/test_user2/foo/bar/ssh/keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/home/ubuntu/foo/bar/ssh/keys',
- TEST_DEFAULT_KEYS),
- ('root', '/root/foo/bar/ssh/keys', TEST_DEFAULT_KEYS),
+ ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS),
+ ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS),
+ ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
-EXTERNAL_KEYS_USERDATA = _USERDATA.format(bootcmd=(
- "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
- "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' "
- "/etc/ssh/sshd_config"))
+EXTERNAL_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
@pytest.mark.ubuntu
@pytest.mark.user_data(EXTERNAL_KEYS_USERDATA)
def test_external_keys(client: IntegrationInstance):
expected_keys = [
- ('test_user1', '/etc/ssh/authorized_keys/test_user1/keys',
- TEST_USER1_KEYS),
- ('test_user2', '/etc/ssh/authorized_keys/test_user2/keys',
- TEST_USER2_KEYS),
- ('ubuntu', '/etc/ssh/authorized_keys/ubuntu/keys',
- TEST_DEFAULT_KEYS),
- ('root', '/etc/ssh/authorized_keys/root/keys', TEST_DEFAULT_KEYS),
+ (
+ "test_user1",
+ "/etc/ssh/authorized_keys/test_user1/keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/etc/ssh/authorized_keys/test_user2/keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS),
+ ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS),
]
common_verify(client, expected_keys)
diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py
index fffa0746..e4a4241f 100644
--- a/tests/integration_tests/modules/test_user_events.py
+++ b/tests/integration_tests/modules/test_user_events.py
@@ -3,8 +3,9 @@
This is currently limited to applying network config on BOOT events.
"""
-import pytest
import re
+
+import pytest
import yaml
from tests.integration_tests.instances import IntegrationInstance
@@ -13,16 +14,16 @@ from tests.integration_tests.instances import IntegrationInstance
def _add_dummy_bridge_to_netplan(client: IntegrationInstance):
# Update netplan configuration to ensure it doesn't change on reboot
netplan = yaml.safe_load(
- client.execute('cat /etc/netplan/50-cloud-init.yaml')
+ client.execute("cat /etc/netplan/50-cloud-init.yaml")
)
# Just a dummy bridge to do nothing
try:
- netplan['network']['bridges']['dummy0'] = {'dhcp4': False}
+ netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False}
except KeyError:
- netplan['network']['bridges'] = {'dummy0': {'dhcp4': False}}
+ netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}}
dumped_netplan = yaml.dump(netplan)
- client.write_to_file('/etc/netplan/50-cloud-init.yaml', dumped_netplan)
+ client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan)
@pytest.mark.lxd_container
@@ -32,19 +33,19 @@ def _add_dummy_bridge_to_netplan(client: IntegrationInstance):
@pytest.mark.oci
@pytest.mark.openstack
def test_boot_event_disabled_by_default(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'network config is disabled' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
pytest.skip("network config disabled. Test doesn't apply")
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
_add_dummy_bridge_to_netplan(client)
- client.execute('rm /var/log/cloud-init.log')
+ client.execute("rm /var/log/cloud-init.log")
client.restart()
- log2 = client.read_from_file('/var/log/cloud-init.log')
+ log2 = client.read_from_file("/var/log/cloud-init.log")
- if 'cache invalid in datasource' in log2:
+ if "cache invalid in datasource" in log2:
# Invalid cache will get cleared, meaning we'll create a new
# "instance" and apply networking config, so events aren't
# really relevant here
@@ -53,8 +54,9 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance):
# We attempt to apply network config twice on every boot.
# Ensure neither time works.
assert 2 == len(
- re.findall(r"Event Denied: scopes=\['network'\] EventType=boot[^-]",
- log2)
+ re.findall(
+ r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2
+ )
)
assert 2 == log2.count(
"Event Denied: scopes=['network'] EventType=boot-legacy"
@@ -64,30 +66,30 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance):
" nor datasource network update allowed"
)
- assert 'dummy0' in client.execute('ls /sys/class/net')
+ assert "dummy0" in client.execute("ls /sys/class/net")
def _test_network_config_applied_on_reboot(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'network config is disabled' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
pytest.skip("network config disabled. Test doesn't apply")
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
_add_dummy_bridge_to_netplan(client)
client.execute('echo "" > /var/log/cloud-init.log')
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'cache invalid in datasource' in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "cache invalid in datasource" in log:
# Invalid cache will get cleared, meaning we'll create a new
# "instance" and apply networking config, so events aren't
# really relevant here
pytest.skip("Test only valid for existing instances")
- assert 'Event Allowed: scope=network EventType=boot' in log
- assert 'Applying network configuration' in log
- assert 'dummy0' not in client.execute('ls /sys/class/net')
+ assert "Event Allowed: scope=network EventType=boot" in log
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
@pytest.mark.azure
diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py
index bcb17b7f..3d1358ce 100644
--- a/tests/integration_tests/modules/test_users_groups.py
+++ b/tests/integration_tests/modules/test_users_groups.py
@@ -11,7 +11,6 @@ import pytest
from tests.integration_tests.clouds import ImageSpecification
from tests.integration_tests.instances import IntegrationInstance
-
USER_DATA = """\
#cloud-config
# Add groups to the system
@@ -84,7 +83,9 @@ class TestUsersGroups:
assert re.search(regex, result.stdout) is not None, (
"'getent {}' resulted in '{}', "
"but expected to match regex {}".format(
- ' '.join(getent_args), result.stdout, regex))
+ " ".join(getent_args), result.stdout, regex
+ )
+ )
def test_user_root_in_secret(self, class_client):
"""Test root user is in 'secret' group."""
@@ -105,19 +106,21 @@ def test_sudoers_includedir(client: IntegrationInstance):
https://github.com/canonical/cloud-init/pull/783
"""
if ImageSpecification.from_os_image().release in [
- 'xenial', 'bionic', 'focal'
+ "xenial",
+ "bionic",
+ "focal",
]:
raise pytest.skip(
- 'Test requires version of sudo installed on groovy and later'
+ "Test requires version of sudo installed on groovy and later"
)
client.execute("sed -i 's/#include/@include/g' /etc/sudoers")
- sudoers = client.read_from_file('/etc/sudoers')
- if '@includedir /etc/sudoers.d' not in sudoers:
+ sudoers = client.read_from_file("/etc/sudoers")
+ if "@includedir /etc/sudoers.d" not in sudoers:
client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers")
client.instance.clean()
client.restart()
- sudoers = client.read_from_file('/etc/sudoers')
+ sudoers = client.read_from_file("/etc/sudoers")
- assert '#includedir' not in sudoers
- assert sudoers.count('includedir /etc/sudoers.d') == 1
+ assert "#includedir" not in sudoers
+ assert sudoers.count("includedir /etc/sudoers.d") == 1
diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py
index f28079d4..3168cd60 100644
--- a/tests/integration_tests/modules/test_version_change.py
+++ b/tests/integration_tests/modules/test_version_change.py
@@ -5,39 +5,40 @@ import pytest
from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import ASSETS_DIR, verify_clean_log
-
-PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl')
-TEST_PICKLE = ASSETS_DIR / 'test_version_change.pkl'
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl"
def _assert_no_pickle_problems(log):
- assert 'Failed loading pickled blob' not in log
+ assert "Failed loading pickled blob" not in log
verify_clean_log(log)
def test_reboot_without_version_change(client: IntegrationInstance):
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected' not in log
- assert 'Cache compatibility status is currently unknown.' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Cache compatibility status is currently unknown." not in log
_assert_no_pickle_problems(log)
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected' not in log
- assert 'Could not determine Python version used to write cache' not in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Could not determine Python version used to write cache" not in log
_assert_no_pickle_problems(log)
# Now ensure that loading a bad pickle gives us problems
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
+ log = client.read_from_file("/var/log/cloud-init.log")
# no cache found is an "expected" upgrade error, and
# "Failed" means we're unable to load the pickle
- assert any([
- 'Failed loading pickled blob from {}'.format(PICKLE_PATH) in log,
- 'no cache found' in log
- ])
+ assert any(
+ [
+ "Failed loading pickled blob from {}".format(PICKLE_PATH) in log,
+ "no cache found" in log,
+ ]
+ )
@pytest.mark.ec2
@@ -54,8 +55,8 @@ def test_cache_purged_on_version_change(client: IntegrationInstance):
client.push_file(TEST_PICKLE, PICKLE_PATH)
client.execute("echo '1.0' > /var/lib/cloud/data/python-version")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- assert 'Python version change detected. Purging cache' in log
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected. Purging cache" in log
_assert_no_pickle_problems(log)
@@ -65,11 +66,11 @@ def test_log_message_on_missing_version_file(client: IntegrationInstance):
client.execute("rm /var/lib/cloud/data/python-version")
client.execute("rm /var/log/cloud-init.log")
client.restart()
- log = client.read_from_file('/var/log/cloud-init.log')
- if 'no cache found' not in log:
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "no cache found" not in log:
# We don't expect the python version file to exist if we have no
# pre-existing cache
assert (
- 'Writing python-version file. '
- 'Cache compatibility status is currently unknown.'
- ) in log
+ "Writing python-version file. "
+ "Cache compatibility status is currently unknown." in log
+ )
diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py
index 1d532fac..1eb7e945 100644
--- a/tests/integration_tests/modules/test_write_files.py
+++ b/tests/integration_tests/modules/test_write_files.py
@@ -7,8 +7,8 @@ and then checks if those files were created during boot.
``tests/cloud_tests/testcases/modules/write_files.yaml``.)"""
import base64
-import pytest
+import pytest
ASCII_TEXT = "ASCII text"
B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8"))
@@ -50,25 +50,30 @@ write_files:
defer: true
owner: 'myuser'
permissions: '0644'
-""".format(B64_CONTENT.decode("ascii"))
+""".format(
+ B64_CONTENT.decode("ascii")
+)
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestWriteFiles:
-
@pytest.mark.parametrize(
- "cmd,expected_out", (
+ "cmd,expected_out",
+ (
("file /root/file_b64", ASCII_TEXT),
("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"),
- ("sha256sum </root/file_binary", (
+ (
+ "sha256sum </root/file_binary",
"2c791c4037ea5bd7e928d6a87380f8ba"
- "7a803cd83d5e4f269e28f5090f0f2c9a"
- )),
- ("file /root/file_gzip",
- "POSIX shell script, ASCII text executable"),
+ "7a803cd83d5e4f269e28f5090f0f2c9a",
+ ),
+ (
+ "file /root/file_gzip",
+ "POSIX shell script, ASCII text executable",
+ ),
("file /root/file_text", ASCII_TEXT),
- )
+ ),
)
def test_write_files(self, cmd, expected_out, class_client):
out = class_client.execute(cmd)
@@ -82,6 +87,7 @@ class TestWriteFiles:
"""
out = class_client.read_from_file("/home/testuser/my-file")
assert "echo 'hello world!'" == out
- assert class_client.execute(
- 'stat -c "%U %a" /home/testuser/my-file'
- ) == 'myuser 644'
+ assert (
+ class_client.execute('stat -c "%U %a" /home/testuser/my-file')
+ == "myuser 644"
+ )