diff options
Diffstat (limited to 'tests/integration_tests/modules')
26 files changed, 524 insertions, 472 deletions
diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py index f5f6c813..48f398d1 100644 --- a/tests/integration_tests/modules/test_apt.py +++ b/tests/integration_tests/modules/test_apt.py @@ -3,12 +3,11 @@ import re import pytest -from cloudinit.config import cc_apt_configure from cloudinit import gpg +from cloudinit.config import cc_apt_configure from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config apt: @@ -104,14 +103,15 @@ class TestApt: """Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg in human readable format. Mimics the output of apt-key finger """ - list_cmd = ' '.join(gpg.GPG_LIST) + ' ' + list_cmd = " ".join(gpg.GPG_LIST) + " " keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS) print(keys) files = class_client.execute( - 'ls ' + cc_apt_configure.APT_TRUSTED_GPG_DIR) + "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR + ) for file in files.split(): path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file - keys += class_client.execute(list_cmd + path) or '' + keys += class_client.execute(list_cmd + path) or "" return keys def test_sources_list(self, class_client: IntegrationInstance): @@ -124,8 +124,8 @@ class TestApt: (This is ported from `tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.) """ - sources_list = class_client.read_from_file('/etc/apt/sources.list') - assert 6 == len(sources_list.rstrip().split('\n')) + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert 6 == len(sources_list.rstrip().split("\n")) for expected_re in EXPECTED_REGEXES: assert re.search(expected_re, sources_list) is not None @@ -136,7 +136,7 @@ class TestApt: Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py """ apt_config = class_client.read_from_file( - '/etc/apt/apt.conf.d/94cloud-init-config' + "/etc/apt/apt.conf.d/94cloud-init-config" ) assert 'Assume-Yes "true";' in apt_config assert 'Fix-Broken "true";' in apt_config @@ -149,40 +149,43 @@ class TestApt: """ release = ImageSpecification.from_os_image().release ppa_path_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/' - 'simplestreams-dev-ubuntu-trunk-{}.list'.format(release) + "/etc/apt/sources.list.d/" + "simplestreams-dev-ubuntu-trunk-{}.list".format(release) ) assert ( - 'http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu' - ) in ppa_path_contents + "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu" + in ppa_path_contents + ) assert TEST_PPA_KEY in self.get_keys(class_client) def test_signed_by(self, class_client: IntegrationInstance): - """Test the apt signed-by functionality. - """ + """Test the apt signed-by functionality.""" release = ImageSpecification.from_os_image().release source = ( "deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] " "http://ppa.launchpad.net/juju/stable/ubuntu" - " {} main".format(release)) + " {} main".format(release) + ) path_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_signed_by.list') + "/etc/apt/sources.list.d/test_signed_by.list" + ) assert path_contents == source key = class_client.execute( - 'gpg --no-default-keyring --with-fingerprint --list-keys ' - '--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg') + "gpg --no-default-keyring --with-fingerprint --list-keys " + "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg" + ) assert TEST_SIGNED_BY_KEY in key def test_bad_key(self, class_client: IntegrationInstance): - """Test the apt signed-by functionality. - """ + """Test the apt signed-by functionality.""" with pytest.raises(OSError): class_client.read_from_file( - '/etc/apt/trusted.list.d/test_bad_key.gpg') + "/etc/apt/trusted.list.d/test_bad_key.gpg" + ) def test_key(self, class_client: IntegrationInstance): """Test the apt key functionality. @@ -191,12 +194,13 @@ class TestApt: tests/cloud_tests/testcases/modules/apt_configure_sources_key.py """ test_archive_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_key.list' + "/etc/apt/sources.list.d/test_key.list" ) assert ( - 'http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu' - ) in test_archive_contents + "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu" + in test_archive_contents + ) assert TEST_KEY in self.get_keys(class_client) def test_keyserver(self, class_client: IntegrationInstance): @@ -206,12 +210,13 @@ class TestApt: tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py """ test_keyserver_contents = class_client.read_from_file( - '/etc/apt/sources.list.d/test_keyserver.list' + "/etc/apt/sources.list.d/test_keyserver.list" ) assert ( - 'http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu' - ) in test_keyserver_contents + "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu" + in test_keyserver_contents + ) assert TEST_KEYSERVER_KEY in self.get_keys(class_client) @@ -221,7 +226,7 @@ class TestApt: Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py """ conf_exists = class_client.execute( - 'test -f /etc/apt/apt.conf.d/90cloud-init-pipelining' + "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining" ).ok assert conf_exists is False @@ -237,7 +242,7 @@ apt: - arches: - default """ -DEFAULT_DATA = _DEFAULT_DATA.format(uri='') +DEFAULT_DATA = _DEFAULT_DATA.format(uri="") @pytest.mark.ubuntu @@ -249,9 +254,9 @@ class TestDefaults: When no uri is provided. """ - zone = class_client.execute('cloud-init query v1.availability_zone') - sources_list = class_client.read_from_file('/etc/apt/sources.list') - assert '{}.clouds.archive.ubuntu.com'.format(zone) in sources_list + zone = class_client.execute("cloud-init query v1.availability_zone") + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list def test_security(self, class_client: IntegrationInstance): """Test apt default security sources. @@ -259,12 +264,12 @@ class TestDefaults: Ported from tests/cloud_tests/testcases/modules/apt_configure_security.py """ - sources_list = class_client.read_from_file('/etc/apt/sources.list') + sources_list = class_client.read_from_file("/etc/apt/sources.list") # 3 lines from main, universe, and multiverse - assert 3 == sources_list.count('deb http://security.ubuntu.com/ubuntu') + assert 3 == sources_list.count("deb http://security.ubuntu.com/ubuntu") assert 3 == sources_list.count( - '# deb-src http://security.ubuntu.com/ubuntu' + "# deb-src http://security.ubuntu.com/ubuntu" ) @@ -280,10 +285,10 @@ def test_default_primary_with_uri(client: IntegrationInstance): Ported from tests/cloud_tests/testcases/modules/apt_configure_primary.py """ - sources_list = client.read_from_file('/etc/apt/sources.list') - assert 'archive.ubuntu.com' not in sources_list + sources_list = client.read_from_file("/etc/apt/sources.list") + assert "archive.ubuntu.com" not in sources_list - assert 'something.random.invalid' in sources_list + assert "something.random.invalid" in sources_list DISABLED_DATA = """\ @@ -310,7 +315,7 @@ class TestDisabled: sources_list = class_client.execute( "cat /etc/apt/sources.list | grep -v '^#'" ).strip() - assert '' == sources_list + assert "" == sources_list def test_disable_apt_pipelining(self, class_client: IntegrationInstance): """Test disabling of apt pipelining. @@ -319,7 +324,7 @@ class TestDisabled: tests/cloud_tests/testcases/modules/apt_pipelining_disable.py """ conf = class_client.read_from_file( - '/etc/apt/apt.conf.d/90cloud-init-pipelining' + "/etc/apt/apt.conf.d/90cloud-init-pipelining" ) assert 'Acquire::http::Pipeline-Depth "0";' in conf @@ -338,8 +343,7 @@ apt: @pytest.mark.user_data(APT_PROXY_DATA) def test_apt_proxy(client: IntegrationInstance): """Test the apt proxy data gets written correctly.""" - out = client.read_from_file( - '/etc/apt/apt.conf.d/90cloud-init-aptproxy') + out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy") assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py index 89c01a9c..d514fc62 100644 --- a/tests/integration_tests/modules/test_ca_certs.py +++ b/tests/integration_tests/modules/test_ca_certs.py @@ -10,7 +10,6 @@ import os.path import pytest - USER_DATA = """\ #cloud-config ca-certs: diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py index 3f41b34d..97bfe52d 100644 --- a/tests/integration_tests/modules/test_cli.py +++ b/tests/integration_tests/modules/test_cli.py @@ -7,7 +7,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance - VALID_USER_DATA = """\ #cloud-config runcmd: @@ -27,9 +26,9 @@ def test_valid_userdata(client: IntegrationInstance): PR #575 """ - result = client.execute('cloud-init devel schema --system') + result = client.execute("cloud-init devel schema --system") assert result.ok - assert 'Valid cloud-config: system userdata' == result.stdout.strip() + assert "Valid cloud-config: system userdata" == result.stdout.strip() @pytest.mark.sru_2020_11 @@ -39,7 +38,7 @@ def test_invalid_userdata(client: IntegrationInstance): PR #575 """ - result = client.execute('cloud-init devel schema --system') + result = client.execute("cloud-init devel schema --system") assert not result.ok - assert 'Cloud config schema errors' in result.stderr + assert "Cloud config schema errors" in result.stderr assert 'needs to begin with "#cloud-config"' in result.stderr diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py index 26a8397d..c88f40d3 100644 --- a/tests/integration_tests/modules/test_combined.py +++ b/tests/integration_tests/modules/test_combined.py @@ -6,9 +6,10 @@ the same instance launch. Most independent module coherence tests can go here. """ import json -import pytest import re +import pytest + from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import ( @@ -76,7 +77,7 @@ class TestCombined: Also tests LP 1511485: final_message is silent. """ client = class_client - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") expected = ( "This is my final message!\n" r"\d+\.\d+.*\n" @@ -94,10 +95,10 @@ class TestCombined: configuring the archives. """ client = class_client - log = client.read_from_file('/var/log/cloud-init.log') - assert 'W: Failed to fetch' not in log - assert 'W: Some index files failed to download' not in log - assert 'E: Unable to locate package ntp' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "W: Failed to fetch" not in log + assert "W: Some index files failed to download" not in log + assert "E: Unable to locate package ntp" not in log def test_byobu(self, class_client: IntegrationInstance): """Test byobu configured as enabled by default.""" @@ -107,22 +108,18 @@ class TestCombined: def test_configured_locale(self, class_client: IntegrationInstance): """Test locale can be configured correctly.""" client = class_client - default_locale = client.read_from_file('/etc/default/locale') - assert 'LANG=en_GB.UTF-8' in default_locale + default_locale = client.read_from_file("/etc/default/locale") + assert "LANG=en_GB.UTF-8" in default_locale - locale_a = client.execute('locale -a') - verify_ordered_items_in_text([ - 'en_GB.utf8', - 'en_US.utf8' - ], locale_a) + locale_a = client.execute("locale -a") + verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a) locale_gen = client.execute( "cat /etc/locale.gen | grep -v '^#' | uniq" ) - verify_ordered_items_in_text([ - 'en_GB.UTF-8', - 'en_US.UTF-8' - ], locale_gen) + verify_ordered_items_in_text( + ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen + ) def test_random_seed_data(self, class_client: IntegrationInstance): """Integration test for the random seed module. @@ -141,12 +138,12 @@ class TestCombined: def test_rsyslog(self, class_client: IntegrationInstance): """Test rsyslog is configured correctly.""" client = class_client - assert 'My test log' in client.read_from_file('/var/tmp/rsyslog.log') + assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log") def test_runcmd(self, class_client: IntegrationInstance): """Test runcmd works as expected""" client = class_client - assert 'hello world' == client.read_from_file('/var/tmp/runcmd_output') + assert "hello world" == client.read_from_file("/var/tmp/runcmd_output") @retry(tries=30, delay=1) def test_ssh_import_id(self, class_client: IntegrationInstance): @@ -160,11 +157,10 @@ class TestCombined: /home/ubuntu; this will need modification to run on other OSes. """ client = class_client - ssh_output = client.read_from_file( - "/home/ubuntu/.ssh/authorized_keys") + ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys") - assert '# ssh-import-id gh:powersj' in ssh_output - assert '# ssh-import-id lp:smoser' in ssh_output + assert "# ssh-import-id gh:powersj" in ssh_output + assert "# ssh-import-id lp:smoser" in ssh_output def test_snap(self, class_client: IntegrationInstance): """Integration test for the snap module. @@ -185,21 +181,22 @@ class TestCombined: """ client = class_client timezone_output = client.execute( - 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"') + 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"' + ) assert timezone_output.strip() == "HDT" def test_no_problems(self, class_client: IntegrationInstance): """Test no errors, warnings, or tracebacks""" client = class_client - status_file = client.read_from_file('/run/cloud-init/status.json') - status_json = json.loads(status_file)['v1'] - for stage in ('init', 'init-local', 'modules-config', 'modules-final'): - assert status_json[stage]['errors'] == [] - result_file = client.read_from_file('/run/cloud-init/result.json') - result_json = json.loads(result_file)['v1'] - assert result_json['errors'] == [] - - log = client.read_from_file('/var/log/cloud-init.log') + status_file = client.read_from_file("/run/cloud-init/status.json") + status_json = json.loads(status_file)["v1"] + for stage in ("init", "init-local", "modules-config", "modules-final"): + assert status_json[stage]["errors"] == [] + result_file = client.read_from_file("/run/cloud-init/result.json") + result_json = json.loads(result_file)["v1"] + assert result_json["errors"] == [] + + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) def test_correct_datasource_detected( @@ -228,73 +225,81 @@ class TestCombined: ) def _check_common_metadata(self, data): - assert data['base64_encoded_keys'] == [] - assert data['merged_cfg'] == 'redacted for non-root user' + assert data["base64_encoded_keys"] == [] + assert data["merged_cfg"] == "redacted for non-root user" image_spec = ImageSpecification.from_os_image() - assert data['sys_info']['dist'][0] == image_spec.os + assert data["sys_info"]["dist"][0] == image_spec.os - v1_data = data['v1'] - assert re.match(r'\d\.\d+\.\d+-\d+', v1_data['kernel_release']) - assert v1_data['variant'] == image_spec.os - assert v1_data['distro'] == image_spec.os - assert v1_data['distro_release'] == image_spec.release - assert v1_data['machine'] == 'x86_64' - assert re.match(r'3.\d\.\d', v1_data['python_version']) + v1_data = data["v1"] + assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"]) + assert v1_data["variant"] == image_spec.os + assert v1_data["distro"] == image_spec.os + assert v1_data["distro_release"] == image_spec.release + assert v1_data["machine"] == "x86_64" + assert re.match(r"3.\d\.\d", v1_data["python_version"]) @pytest.mark.lxd_container def test_instance_json_lxd(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) self._check_common_metadata(data) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'unknown' - assert v1_data['platform'] == 'lxd' - assert v1_data['subplatform'] == ( - 'seed-dir (/var/lib/cloud/seed/nocloud-net)') - assert v1_data['availability_zone'] is None - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'] == client.instance.name - assert v1_data['region'] is None + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert ( + v1_data["subplatform"] + == "seed-dir (/var/lib/cloud/seed/nocloud-net)" + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None @pytest.mark.lxd_vm def test_instance_json_lxd_vm(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) self._check_common_metadata(data) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'unknown' - assert v1_data['platform'] == 'lxd' - assert any([ - '/var/lib/cloud/seed/nocloud-net' in v1_data['subplatform'], - '/dev/sr0' in v1_data['subplatform'] - ]) - assert v1_data['availability_zone'] is None - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'] == client.instance.name - assert v1_data['region'] is None + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert any( + [ + "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"], + "/dev/sr0" in v1_data["subplatform"], + ] + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None @pytest.mark.ec2 def test_instance_json_ec2(self, class_client: IntegrationInstance): client = class_client instance_json_file = client.read_from_file( - '/run/cloud-init/instance-data.json') + "/run/cloud-init/instance-data.json" + ) data = json.loads(instance_json_file) - v1_data = data['v1'] - assert v1_data['cloud_name'] == 'aws' - assert v1_data['platform'] == 'ec2' - assert v1_data['subplatform'].startswith('metadata') - assert v1_data[ - 'availability_zone'] == client.instance.availability_zone - assert v1_data['instance_id'] == client.instance.name - assert v1_data['local_hostname'].startswith('ip-') - assert v1_data['region'] == client.cloud.cloud_instance.region + v1_data = data["v1"] + assert v1_data["cloud_name"] == "aws" + assert v1_data["platform"] == "ec2" + assert v1_data["subplatform"].startswith("metadata") + assert ( + v1_data["availability_zone"] == client.instance.availability_zone + ) + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"].startswith("ip-") + assert v1_data["region"] == client.cloud.cloud_instance.region @pytest.mark.gce def test_instance_json_gce(self, class_client: IntegrationInstance): diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py index 8429873f..96525cac 100644 --- a/tests/integration_tests/modules/test_command_output.py +++ b/tests/integration_tests/modules/test_command_output.py @@ -8,7 +8,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config output: { all: "| tee -a /var/log/cloud-init-test-output" } @@ -18,5 +17,5 @@ final_message: "should be last line in cloud-init-test-output file" @pytest.mark.user_data(USER_DATA) def test_runcmd(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init-test-output') - assert 'should be last line in cloud-init-test-output file' in log + log = client.read_from_file("/var/log/cloud-init-test-output") + assert "should be last line in cloud-init-test-output file" in log diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py index 9c9edc46..22277331 100644 --- a/tests/integration_tests/modules/test_disk_setup.py +++ b/tests/integration_tests/modules/test_disk_setup.py @@ -1,25 +1,29 @@ import json import os -import pytest from uuid import uuid4 + +import pytest from pycloudlib.lxd.instance import LXDInstance from cloudinit.subp import subp from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_clean_log -DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4()) +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) def setup_and_mount_lxd_disk(instance: LXDInstance): - subp('lxc config device add {} test-disk-setup-disk disk source={}'.format( - instance.name, DISK_PATH).split()) + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) @pytest.yield_fixture def create_disk(): # 640k should be enough for anybody - subp('dd if=/dev/zero of={} bs=1k count=640'.format(DISK_PATH).split()) + subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split()) yield os.remove(DISK_PATH) @@ -54,21 +58,21 @@ class TestDeviceAliases: """Test devices aliases work on disk setup/mount""" def test_device_alias(self, create_disk, client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert ( - "updated disk_setup device entry 'my_alias' to '/dev/sdb'" - ) in log - assert 'changed my_alias.1 => /dev/sdb1' in log - assert 'changed my_alias.2 => /dev/sdb2' in log + "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log + ) + assert "changed my_alias.1 => /dev/sdb1" in log + assert "changed my_alias.2 => /dev/sdb2" in log verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 2 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt1' - assert sdb['children'][1]['name'] == 'sdb2' - assert sdb['children'][1]['mountpoint'] == '/mnt2' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["name"] == "sdb2" + assert sdb["children"][1]["mountpoint"] == "/mnt2" PARTPROBE_USERDATA = """\ @@ -121,13 +125,13 @@ class TestPartProbeAvailability: def _verify_first_disk_setup(self, client, log): verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 2 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt1' - assert sdb['children'][1]['name'] == 'sdb2' - assert sdb['children'][1]['mountpoint'] == '/mnt2' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["name"] == "sdb2" + assert sdb["children"][1]["mountpoint"] == "/mnt2" # Not bionic or xenial because the LXD agent gets in the way of us # changing the userdata @@ -148,13 +152,13 @@ class TestPartProbeAvailability: with a warning and a traceback. When partprobe is in use, everything should work successfully. """ - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") self._verify_first_disk_setup(client, log) # Update our userdata and cloud.cfg to mount then perform new disk # setup client.write_to_file( - '/var/lib/cloud/seed/nocloud-net/user-data', + "/var/lib/cloud/seed/nocloud-net/user-data", UPDATED_PARTPROBE_USERDATA, ) client.execute( @@ -162,17 +166,17 @@ class TestPartProbeAvailability: "/etc/cloud/cloud.cfg" ) - client.execute('cloud-init clean --logs') + client.execute("cloud-init clean --logs") client.restart() # Assert new setup works as expected verify_clean_log(log) - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 1 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['children'][0]['mountpoint'] == '/mnt3' + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][0]["mountpoint"] == "/mnt3" def test_disk_setup_no_partprobe( self, create_disk, client: IntegrationInstance @@ -180,11 +184,11 @@ class TestPartProbeAvailability: """Ensure disk setup still works as expected without partprobe.""" # We can't do this part in a bootcmd because the path has already # been found by the time we get to the bootcmd - client.execute('rm $(which partprobe)') - client.execute('cloud-init clean --logs') + client.execute("rm $(which partprobe)") + client.execute("cloud-init clean --logs") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") self._verify_first_disk_setup(client, log) - assert 'partprobe' not in log + assert "partprobe" not in log diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py index af1e3a15..67251817 100644 --- a/tests/integration_tests/modules/test_growpart.py +++ b/tests/integration_tests/modules/test_growpart.py @@ -1,22 +1,26 @@ +import json import os -import pytest import pathlib -import json from uuid import uuid4 + +import pytest from pycloudlib.lxd.instance import LXDInstance from cloudinit.subp import subp from tests.integration_tests.instances import IntegrationInstance -DISK_PATH = '/tmp/test_disk_setup_{}'.format(uuid4()) +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) def setup_and_mount_lxd_disk(instance: LXDInstance): - subp('lxc config device add {} test-disk-setup-disk disk source={}'.format( - instance.name, DISK_PATH).split()) + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) -@pytest.fixture(scope='class', autouse=True) +@pytest.fixture(scope="class", autouse=True) def create_disk(): """Create 16M sparse file""" pathlib.Path(DISK_PATH).touch() @@ -50,13 +54,15 @@ class TestGrowPart: """Test growpart""" def test_grow_part(self, client: IntegrationInstance): - """Verify """ - log = client.read_from_file('/var/log/cloud-init.log') - assert ("cc_growpart.py[INFO]: '/dev/sdb1' resized:" - " changed (/dev/sdb, 1) from") in log - - lsblk = json.loads(client.execute('lsblk --json')) - sdb = [x for x in lsblk['blockdevices'] if x['name'] == 'sdb'][0] - assert len(sdb['children']) == 1 - assert sdb['children'][0]['name'] == 'sdb1' - assert sdb['size'] == '16M' + """Verify""" + log = client.read_from_file("/var/log/cloud-init.log") + assert ( + "cc_growpart.py[INFO]: '/dev/sdb1' resized:" + " changed (/dev/sdb, 1) from" in log + ) + + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["size"] == "16M" diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py index f5abc86f..0bad761e 100644 --- a/tests/integration_tests/modules/test_hotplug.py +++ b/tests/integration_tests/modules/test_hotplug.py @@ -1,8 +1,9 @@ -import pytest import time -import yaml from collections import namedtuple +import pytest +import yaml + from tests.integration_tests.instances import IntegrationInstance USER_DATA = """\ @@ -12,28 +13,28 @@ updates: when: ['hotplug'] """ -ip_addr = namedtuple('ip_addr', 'interface state ip4 ip6') +ip_addr = namedtuple("ip_addr", "interface state ip4 ip6") def _wait_till_hotplug_complete(client, expected_runs=1): for _ in range(60): - log = client.read_from_file('/var/log/cloud-init.log') - if log.count('Exiting hotplug handler') == expected_runs: + log = client.read_from_file("/var/log/cloud-init.log") + if log.count("Exiting hotplug handler") == expected_runs: return log time.sleep(1) - raise Exception('Waiting for hotplug handler failed') + raise Exception("Waiting for hotplug handler failed") def _get_ip_addr(client): ips = [] - lines = client.execute('ip --brief addr').split('\n') + lines = client.execute("ip --brief addr").split("\n") for line in lines: attributes = line.split() interface, state = attributes[0], attributes[1] ip4_cidr = attributes[2] if len(attributes) > 2 else None ip6_cidr = attributes[3] if len(attributes) > 3 else None - ip4 = ip4_cidr.split('/')[0] if ip4_cidr else None - ip6 = ip6_cidr.split('/')[0] if ip6_cidr else None + ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None + ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None ip = ip_addr(interface, state, ip4, ip6) ips.append(ip) return ips @@ -47,10 +48,10 @@ def _get_ip_addr(client): @pytest.mark.user_data(USER_DATA) def test_hotplug_add_remove(client: IntegrationInstance): ips_before = _get_ip_addr(client) - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Exiting hotplug handler' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log assert client.execute( - 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules' + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" ).ok # Add new NIC @@ -62,11 +63,11 @@ def test_hotplug_add_remove(client: IntegrationInstance): assert len(ips_after_add) == len(ips_before) + 1 assert added_ip not in [ip.ip4 for ip in ips_before] assert added_ip in [ip.ip4 for ip in ips_after_add] - assert new_addition.state == 'UP' + assert new_addition.state == "UP" - netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml') + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") config = yaml.safe_load(netplan_cfg) - assert new_addition.interface in config['network']['ethernets'] + assert new_addition.interface in config["network"]["ethernets"] # Remove new NIC client.instance.remove_network_interface(added_ip) @@ -75,37 +76,37 @@ def test_hotplug_add_remove(client: IntegrationInstance): assert len(ips_after_remove) == len(ips_before) assert added_ip not in [ip.ip4 for ip in ips_after_remove] - netplan_cfg = client.read_from_file('/etc/netplan/50-cloud-init.yaml') + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") config = yaml.safe_load(netplan_cfg) - assert new_addition.interface not in config['network']['ethernets'] + assert new_addition.interface not in config["network"]["ethernets"] - assert 'enabled' == client.execute( - 'cloud-init devel hotplug-hook -s net query' + assert "enabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" ) @pytest.mark.openstack def test_no_hotplug_in_userdata(client: IntegrationInstance): ips_before = _get_ip_addr(client) - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Exiting hotplug handler' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log assert client.execute( - 'test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules' + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" ).failed # Add new NIC client.instance.add_network_interface() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'hotplug-hook' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "hotplug-hook" not in log ips_after_add = _get_ip_addr(client) if len(ips_after_add) == len(ips_before) + 1: # We can see the device, but it should not have been brought up new_ip = [ip for ip in ips_after_add if ip not in ips_before][0] - assert new_ip.state == 'DOWN' + assert new_ip.state == "DOWN" else: assert len(ips_after_add) == len(ips_before) - assert 'disabled' == client.execute( - 'cloud-init devel hotplug-hook -s net query' + assert "disabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" ) diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py index fe8eff1a..7788c6f0 100644 --- a/tests/integration_tests/modules/test_jinja_templating.py +++ b/tests/integration_tests/modules/test_jinja_templating.py @@ -4,7 +4,6 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import verify_ordered_items_in_text - USER_DATA = """\ ## template: jinja #cloud-config @@ -23,12 +22,12 @@ def test_runcmd_with_variable_substitution(client: IntegrationInstance): we can also substitute variables from instance-data-sensitive LP: #1931392. """ - hostname = client.execute('hostname').stdout.strip() + hostname = client.execute("hostname").stdout.strip() expected = [ hostname, - ('Merged cloud-init system config from /etc/cloud/cloud.cfg and ' - '/etc/cloud/cloud.cfg.d/'), - hostname + "Merged cloud-init system config from /etc/cloud/cloud.cfg and " + "/etc/cloud/cloud.cfg.d/", + hostname, ] - output = client.read_from_file('/var/tmp/runcmd_output') + output = client.read_from_file("/var/tmp/runcmd_output") verify_ordered_items_in_text(expected, output) diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py index e79db3c7..50899982 100644 --- a/tests/integration_tests/modules/test_keys_to_console.py +++ b/tests/integration_tests/modules/test_keys_to_console.py @@ -36,6 +36,7 @@ users: @pytest.mark.user_data(BLACKLIST_USER_DATA) class TestKeysToConsoleBlacklist: """Test that the blacklist options work as expected.""" + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA"]) def test_excluded_keys(self, class_client, key_type): syslog = class_client.read_from_file("/var/log/syslog") @@ -55,6 +56,7 @@ class TestAllKeysToConsoleBlacklist: """Test that when key blacklist contains all key types that no header/footer are output. """ + def test_header_excluded(self, class_client): syslog = class_client.read_from_file("/var/log/syslog") assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog @@ -67,6 +69,7 @@ class TestAllKeysToConsoleBlacklist: @pytest.mark.user_data(DISABLED_USER_DATA) class TestKeysToConsoleDisabled: """Test that output can be fully disabled.""" + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"]) def test_keys_excluded(self, class_client, key_type): syslog = class_client.read_from_file("/var/log/syslog") @@ -90,7 +93,7 @@ class TestKeysToConsoleEnabled: """Test that output can be enabled disabled.""" def test_duplicate_messaging_console_log(self, class_client): - class_client.execute('cloud-init status --wait --long').ok + class_client.execute("cloud-init status --wait --long").ok try: console_log = class_client.instance.console_log() except NotImplementedError: @@ -98,13 +101,13 @@ class TestKeysToConsoleEnabled: # log pytest.skip("NotImplementedError when requesting console log") return - if console_log.lower() == 'no console output': + if console_log.lower() == "no console output": # This test retries because we might not have the full console log # on the first fetch. However, if we have no console output # at all, we don't want to keep retrying as that would trigger # another 5 minute wait on the pycloudlib side, which could # leave us waiting for a couple hours - pytest.fail('no console output') + pytest.fail("no console output") return msg = "no authorized SSH keys fingerprints found for user barfoo." assert 1 == console_log.count(msg) diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py index 65dce3c7..2cb3f4f3 100644 --- a/tests/integration_tests/modules/test_lxd_bridge.py +++ b/tests/integration_tests/modules/test_lxd_bridge.py @@ -8,7 +8,6 @@ import yaml from tests.integration_tests.util import verify_clean_log - USER_DATA = """\ #cloud-config lxd: @@ -29,7 +28,6 @@ lxd: @pytest.mark.no_container @pytest.mark.user_data(USER_DATA) class TestLxdBridge: - @pytest.mark.parametrize("binary_name", ["lxc", "lxd"]) def test_binaries_installed(self, class_client, binary_name): """Check that the expected LXD binaries are installed""" diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py index c777a641..fc62e63b 100644 --- a/tests/integration_tests/modules/test_ntp_servers.py +++ b/tests/integration_tests/modules/test_ntp_servers.py @@ -9,8 +9,8 @@ and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``) """ import re -import yaml import pytest +import yaml from tests.integration_tests.instances import IntegrationInstance @@ -33,13 +33,13 @@ EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"] @pytest.mark.user_data(USER_DATA) class TestNtpServers: - def test_ntp_installed(self, class_client: IntegrationInstance): """Test that `ntpd --version` succeeds, indicating installation.""" assert class_client.execute("ntpd --version").ok - def test_dist_config_file_is_empty(self, - class_client: IntegrationInstance): + def test_dist_config_file_is_empty( + self, class_client: IntegrationInstance + ): """Test that the distributed config file is empty. (This test is skipped on all currently supported Ubuntu releases, so @@ -56,13 +56,13 @@ class TestNtpServers: assert re.search( r"^server {} iburst".format(expected_server), ntp_conf, - re.MULTILINE + re.MULTILINE, ) for expected_pool in EXPECTED_POOLS: assert re.search( r"^pool {} iburst".format(expected_pool), ntp_conf, - re.MULTILINE + re.MULTILINE, ) def test_ntpq_servers(self, class_client: IntegrationInstance): @@ -84,12 +84,12 @@ ntp: @pytest.mark.user_data(CHRONY_DATA) def test_chrony(client: IntegrationInstance): - if client.execute('test -f /etc/chrony.conf').ok: - chrony_conf = '/etc/chrony.conf' + if client.execute("test -f /etc/chrony.conf").ok: + chrony_conf = "/etc/chrony.conf" else: - chrony_conf = '/etc/chrony/chrony.conf' + chrony_conf = "/etc/chrony/chrony.conf" contents = client.read_from_file(chrony_conf) - assert 'server 172.16.15.14' in contents + assert "server 172.16.15.14" in contents TIMESYNCD_DATA = """\ @@ -105,9 +105,9 @@ ntp: @pytest.mark.user_data(TIMESYNCD_DATA) def test_timesyncd(client: IntegrationInstance): contents = client.read_from_file( - '/etc/systemd/timesyncd.conf.d/cloud-init.conf' + "/etc/systemd/timesyncd.conf.d/cloud-init.conf" ) - assert 'NTP=172.16.15.14' in contents + assert "NTP=172.16.15.14" in contents EMPTY_NTP = """\ @@ -121,8 +121,8 @@ ntp: @pytest.mark.user_data(EMPTY_NTP) def test_empty_ntp(client: IntegrationInstance): - assert client.execute('ntpd --version').ok - assert client.execute('test -f /etc/ntp.conf.dist').failed - assert 'pool.ntp.org iburst' in client.execute( + assert client.execute("ntpd --version").ok + assert client.execute("test -f /etc/ntp.conf.dist").failed + assert "pool.ntp.org iburst" in client.execute( 'grep -v "^#" /etc/ntp.conf' ) diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py index 28d741bc..d668d81c 100644 --- a/tests/integration_tests/modules/test_package_update_upgrade_install.py +++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py @@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as """ import re -import pytest +import pytest USER_DATA = """\ #cloud-config @@ -29,7 +29,6 @@ package_upgrade: true @pytest.mark.ubuntu @pytest.mark.user_data(USER_DATA) class TestPackageUpdateUpgradeInstall: - def assert_package_installed(self, pkg_out, name, version=None): """Check dpkg-query --show output for matching package name. @@ -38,7 +37,8 @@ class TestPackageUpdateUpgradeInstall: version. """ pkg_match = re.search( - "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE) + "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE + ) if pkg_match: installed_version = pkg_match.group("version") if not version: @@ -46,8 +46,10 @@ class TestPackageUpdateUpgradeInstall: if installed_version.startswith(version): return # Success raise AssertionError( - "Expected package version %s-%s not found. Found %s" % - name, version, installed_version) + "Expected package version %s-%s not found. Found %s" % name, + version, + installed_version, + ) raise AssertionError("Package not installed: %s" % name) def test_new_packages_are_installed(self, class_client): @@ -58,11 +60,13 @@ class TestPackageUpdateUpgradeInstall: def test_packages_were_updated(self, class_client): out = class_client.execute( - "grep ^Commandline: /var/log/apt/history.log") + "grep ^Commandline: /var/log/apt/history.log" + ) assert ( "Commandline: /usr/bin/apt-get --option=Dpkg::Options" "::=--force-confold --option=Dpkg::options::=--force-unsafe-io " - "--assume-yes --quiet install sl tree") in out + "--assume-yes --quiet install sl tree" in out + ) def test_packages_were_upgraded(self, class_client): """Test cloud-init-output for install & upgrade stuff.""" diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py index 00fdeaea..33527e1e 100644 --- a/tests/integration_tests/modules/test_persistence.py +++ b/tests/integration_tests/modules/test_persistence.py @@ -10,21 +10,23 @@ from tests.integration_tests.util import ( verify_ordered_items_in_text, ) - -PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl') -TEST_PICKLE = ASSETS_DIR / 'trusty_with_mime.pkl' +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl" @pytest.mark.lxd_container def test_log_message_on_missing_version_file(client: IntegrationInstance): client.push_file(TEST_PICKLE, PICKLE_PATH) client.restart() - assert client.execute('cloud-init status --wait').ok - log = client.read_from_file('/var/log/cloud-init.log') - verify_ordered_items_in_text([ - "Unable to unpickle datasource: 'MIMEMultipart' object has no " - "attribute 'policy'. Ignoring current cache.", - 'no cache found', - 'Searching for local data source', - 'SUCCESS: found local data from DataSourceNoCloud' - ], log) + assert client.execute("cloud-init status --wait").ok + log = client.read_from_file("/var/log/cloud-init.log") + verify_ordered_items_in_text( + [ + "Unable to unpickle datasource: 'MIMEMultipart' object has no " + "attribute 'policy'. Ignoring current cache.", + "no cache found", + "Searching for local data source", + "SUCCESS: found local data from DataSourceNoCloud", + ], + log, + ) diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py index 5f3a32ac..a629029d 100644 --- a/tests/integration_tests/modules/test_power_state_change.py +++ b/tests/integration_tests/modules/test_power_state_change.py @@ -30,7 +30,7 @@ def _detect_reboot(instance: IntegrationInstance): instance.instance.wait() for _ in range(600): try: - log = instance.read_from_file('/var/log/cloud-init.log') + log = instance.read_from_file("/var/log/cloud-init.log") boot_count = log.count("running 'init-local'") if boot_count == 1: instance.instance.wait() @@ -40,11 +40,11 @@ def _detect_reboot(instance: IntegrationInstance): pass time.sleep(1) else: - raise Exception('Could not detect reboot') + raise Exception("Could not detect reboot") def _can_connect(instance): - return instance.execute('true').ok + return instance.execute("true").ok # This test is marked unstable because even though it should be able to @@ -55,36 +55,44 @@ def _can_connect(instance): @pytest.mark.ubuntu @pytest.mark.lxd_container class TestPowerChange: - @pytest.mark.parametrize('mode,delay,timeout,expected', [ - ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'), - ('reboot', 'now', '0', 'will execute: shutdown -r now msg'), - ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'), - ]) - def test_poweroff(self, session_cloud: IntegrationCloud, - mode, delay, timeout, expected): + @pytest.mark.parametrize( + "mode,delay,timeout,expected", + [ + ("poweroff", "now", "10", "will execute: shutdown -P now msg"), + ("reboot", "now", "0", "will execute: shutdown -r now msg"), + ("halt", "+1", "0", "will execute: shutdown -H +1 msg"), + ], + ) + def test_poweroff( + self, session_cloud: IntegrationCloud, mode, delay, timeout, expected + ): with session_cloud.launch( user_data=USER_DATA.format( - delay=delay, mode=mode, timeout=timeout, condition='true'), - launch_kwargs={'wait': False}, + delay=delay, mode=mode, timeout=timeout, condition="true" + ), + launch_kwargs={"wait": False}, ) as instance: - if mode == 'reboot': + if mode == "reboot": _detect_reboot(instance) else: instance.instance.wait_for_stop() instance.instance.start(wait=True) - log = instance.read_from_file('/var/log/cloud-init.log') + log = instance.read_from_file("/var/log/cloud-init.log") assert _can_connect(instance) lines_to_check = [ - 'Running module power-state-change', + "Running module power-state-change", expected, "running 'init-local'", - 'config-power-state-change already ran', + "config-power-state-change already ran", ] verify_ordered_items_in_text(lines_to_check, log) - @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff', - timeout='0', condition='false')) + @pytest.mark.user_data( + USER_DATA.format( + delay="0", mode="poweroff", timeout="0", condition="false" + ) + ) def test_poweroff_false_condition(self, client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert _can_connect(client) - assert 'Condition was false. Will not perform state change' in log + assert "Condition was false. Will not perform state change" in log diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py index f40a6ca3..1bd9cee4 100644 --- a/tests/integration_tests/modules/test_puppet.py +++ b/tests/integration_tests/modules/test_puppet.py @@ -15,9 +15,9 @@ puppet: @pytest.mark.user_data(SERVICE_DATA) def test_puppet_service(client: IntegrationInstance): """Basic test that puppet gets installed and runs.""" - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") verify_clean_log(log) - assert client.execute('systemctl is-active puppet').ok + assert client.execute("systemctl is-active puppet").ok assert "Running command ['puppet', 'agent'" not in log @@ -35,5 +35,5 @@ puppet: @pytest.mark.user_data(EXEC_DATA) def test_pupet_exec(client: IntegrationInstance): """Basic test that puppet gets installed and runs.""" - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") assert "Running command ['puppet', 'agent', '--noop']" in log diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py index e7f7f6b6..ae0aeae9 100644 --- a/tests/integration_tests/modules/test_set_hostname.py +++ b/tests/integration_tests/modules/test_set_hostname.py @@ -11,7 +11,6 @@ after the system is boot. import pytest - USER_DATA_HOSTNAME = """\ #cloud-config hostname: cloudinit2 @@ -34,7 +33,6 @@ fqdn: cloudinit2.test.io @pytest.mark.ci class TestHostname: - @pytest.mark.user_data(USER_DATA_HOSTNAME) def test_hostname(self, client): hostname_output = client.execute("hostname") @@ -59,6 +57,8 @@ class TestHostname: assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip() host_output = client.execute("grep ^127 /etc/hosts") - assert '127.0.1.1 {} {}'.format( - fqdn_output, hostname_output) in host_output - assert '127.0.0.1 localhost' in host_output + assert ( + "127.0.1.1 {} {}".format(fqdn_output, hostname_output) + in host_output + ) + assert "127.0.0.1 localhost" in host_output diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py index ac9db19d..e0f8b692 100644 --- a/tests/integration_tests/modules/test_set_password.py +++ b/tests/integration_tests/modules/test_set_password.py @@ -15,7 +15,6 @@ import yaml from tests.integration_tests.util import retry - COMMON_USER_DATA = """\ #cloud-config ssh_pwauth: yes @@ -42,7 +41,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/" lock_passwd: false """ -LIST_USER_DATA = COMMON_USER_DATA + """ +LIST_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: - tom:mypassword123! @@ -50,8 +51,11 @@ chpasswd: - harry:RANDOM - mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) -STRING_USER_DATA = COMMON_USER_DATA + """ +STRING_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: | tom:mypassword123! @@ -59,6 +63,7 @@ chpasswd: harry:RANDOM mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"] USERS_PASSWD_VALUES = { @@ -141,13 +146,13 @@ class Mixin: # log pytest.skip("NotImplementedError when requesting console log") return - if console_log.lower() == 'no console output': + if console_log.lower() == "no console output": # This test retries because we might not have the full console log # on the first fetch. However, if we have no console output # at all, we don't want to keep retrying as that would trigger # another 5 minute wait on the pycloudlib side, which could # leave us waiting for a couple hours - pytest.fail('no console output') + pytest.fail("no console output") return assert "dick:" in console_log assert "harry:" in console_log diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py index cf14d0b0..89b49576 100644 --- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py +++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py @@ -14,7 +14,6 @@ import pytest from tests.integration_tests.util import retry - USER_DATA_SSH_AUTHKEY_DISABLE = """\ #cloud-config no_ssh_fingerprints: true @@ -32,13 +31,13 @@ ssh_authorized_keys: @pytest.mark.ci class TestSshAuthkeyFingerprints: - @pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE) def test_ssh_authkey_fingerprints_disable(self, client): cloudinit_output = client.read_from_file("/var/log/cloud-init.log") assert ( "Skipping module named ssh-authkey-fingerprints, " - "logging of SSH fingerprints disabled") in cloudinit_output + "logging of SSH fingerprints disabled" in cloudinit_output + ) # retry decorator here because it can take some time to be reflected # in syslog @@ -47,7 +46,7 @@ class TestSshAuthkeyFingerprints: def test_ssh_authkey_fingerprints_enable(self, client): syslog_output = client.read_from_file("/var/log/syslog") - assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None - assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None - assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None - assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None + assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None + assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None + assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None + assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py index 60c36982..1dd0adf1 100644 --- a/tests/integration_tests/modules/test_ssh_generate.py +++ b/tests/integration_tests/modules/test_ssh_generate.py @@ -10,7 +10,6 @@ keys were created. import pytest - USER_DATA = """\ #cloud-config ssh_genkeytypes: @@ -23,28 +22,27 @@ authkey_hash: sha512 @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysGenerate: - @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_dsa_key.pub", "/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_rsa_key.pub", "/etc/ssh/ssh_host_rsa_key", - ) + ), ) def test_ssh_keys_not_generated(self, ssh_key_path, class_client): - out = class_client.execute( - "test -e {}".format(ssh_key_path) - ) + out = class_client.execute("test -e {}".format(ssh_key_path)) assert out.failed @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_ecdsa_key.pub", "/etc/ssh/ssh_host_ecdsa_key", "/etc/ssh/ssh_host_ed25519_key.pub", "/etc/ssh/ssh_host_ed25519_key", - ) + ), ) def test_ssh_keys_generated(self, ssh_key_path, class_client): out = class_client.read_from_file(ssh_key_path) diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py index 6aae96ae..b79f18eb 100644 --- a/tests/integration_tests/modules/test_ssh_keys_provided.py +++ b/tests/integration_tests/modules/test_ssh_keys_provided.py @@ -9,7 +9,6 @@ system. import pytest - USER_DATA = """\ #cloud-config disable_root: false @@ -82,44 +81,33 @@ ssh_keys: @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysProvided: - @pytest.mark.parametrize( "config_path,expected_out", ( ( "/etc/ssh/ssh_host_dsa_key.pub", - ( - "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" - "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM" - ), + "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" + "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM", ), ( "/etc/ssh/ssh_host_dsa_key", - ( - "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" - "hOVAfzZ6+jklP" - ), + "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" + "hOVAfzZ6+jklP", ), ( "/etc/ssh/ssh_host_rsa_key.pub", - ( - "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" - "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4" - ), + "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" + "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4", ), ( "/etc/ssh/ssh_host_rsa_key", - ( - "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" - "RQvLZpMRdywBm" - ), + "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" + "RQvLZpMRdywBm", ), ( "/etc/ssh/ssh_host_rsa_key-cert.pub", - ( - "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" - "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD" - ), + "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" + "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD", ), ( "/etc/ssh/sshd_config", @@ -127,33 +115,25 @@ class TestSshKeysProvided: ), ( "/etc/ssh/ssh_host_ecdsa_key.pub", - ( - "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" - "BBFsS5Tvky/IC/dXhE/afxxU" - ), + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" + "BBFsS5Tvky/IC/dXhE/afxxU", ), ( "/etc/ssh/ssh_host_ecdsa_key", - ( - "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" - "5mpZqxgX4vcgb" - ), + "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" + "5mpZqxgX4vcgb", ), ( "/etc/ssh/ssh_host_ed25519_key.pub", - ( - "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" - "G15dqjQ2XkNVOEnb5" - ), + "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" + "G15dqjQ2XkNVOEnb5", ), ( "/etc/ssh/ssh_host_ed25519_key", - ( - "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" - "OhteXao0Nl5DVThJ2+Q" - ), + "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" + "OhteXao0Nl5DVThJ2+Q", ), - ) + ), ) def test_ssh_provided_keys(self, config_path, expected_out, class_client): out = class_client.read_from_file(config_path).strip() diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py index b39454e6..8330a1ce 100644 --- a/tests/integration_tests/modules/test_ssh_keysfile.py +++ b/tests/integration_tests/modules/test_ssh_keysfile.py @@ -1,15 +1,16 @@ +from io import StringIO + import paramiko import pytest -from io import StringIO from paramiko.ssh_exception import SSHException from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import get_test_rsa_keypair -TEST_USER1_KEYS = get_test_rsa_keypair('test1') -TEST_USER2_KEYS = get_test_rsa_keypair('test2') -TEST_DEFAULT_KEYS = get_test_rsa_keypair('test3') +TEST_USER1_KEYS = get_test_rsa_keypair("test1") +TEST_USER2_KEYS = get_test_rsa_keypair("test2") +TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3") _USERDATA = """\ #cloud-config @@ -26,7 +27,7 @@ users: ssh_authorized_keys: - {user2} """.format( - bootcmd='{bootcmd}', + bootcmd="{bootcmd}", default=TEST_DEFAULT_KEYS.public_key, user1=TEST_USER1_KEYS.public_key, user2=TEST_USER2_KEYS.public_key, @@ -37,9 +38,9 @@ def common_verify(client, expected_keys): for user, filename, keys in expected_keys: # Ensure key is in the key file contents = client.read_from_file(filename) - if user in ['ubuntu', 'root']: - lines = contents.split('\n') - if user == 'root': + if user in ["ubuntu", "root"]: + lines = contents.split("\n") + if user == "root": # Our personal public key gets added by pycloudlib in # addition to the default `ssh_authorized_keys` assert len(lines) == 2 @@ -54,8 +55,9 @@ def common_verify(client, expected_keys): # Ensure we can actually connect ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - paramiko_key = paramiko.RSAKey.from_private_key(StringIO( - keys.private_key)) + paramiko_key = paramiko.RSAKey.from_private_key( + StringIO(keys.private_key) + ) # Will fail with AuthenticationException if # we cannot connect @@ -71,8 +73,11 @@ def common_verify(client, expected_keys): other_users = [u[0] for u in expected_keys if u[2] != keys] for other_user in other_users: with pytest.raises(SSHException): - print('trying to connect as {} with key from {}'.format( - other_user, user)) + print( + "trying to connect as {} with key from {}".format( + other_user, user + ) + ) ssh.connect( client.instance.ip, username=other_user, @@ -83,37 +88,38 @@ def common_verify(client, expected_keys): # Ensure we haven't messed with any /home permissions # See LP: #1940233 - home_dir = '/home/{}'.format(user) + home_dir = "/home/{}".format(user) # Home permissions aren't consistent between releases. On ubuntu # this can change to 750 once focal is unsupported. if ImageSpecification.from_os_image().release in ("bionic", "focal"): - home_perms = '755' + home_perms = "755" else: - home_perms = '750' - if user == 'root': - home_dir = '/root' - home_perms = '700' - assert '{} {}'.format(user, home_perms) == client.execute( + home_perms = "750" + if user == "root": + home_dir = "/root" + home_perms = "700" + assert "{} {}".format(user, home_perms) == client.execute( 'stat -c "%U %a" {}'.format(home_dir) ) if client.execute("test -d {}/.ssh".format(home_dir)).ok: - assert '{} 700'.format(user) == client.execute( + assert "{} 700".format(user) == client.execute( 'stat -c "%U %a" {}/.ssh'.format(home_dir) ) - assert '{} 600'.format(user) == client.execute( + assert "{} 600".format(user) == client.execute( 'stat -c "%U %a" {}'.format(filename) ) # Also ensure ssh-keygen works as expected - client.execute('mkdir {}/.ssh'.format(home_dir)) + client.execute("mkdir {}/.ssh".format(home_dir)) assert client.execute( "ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format( - home_dir) + home_dir + ) ).ok - assert client.execute('test -f {}/.ssh/id_rsa'.format(home_dir)) - assert client.execute('test -f {}/.ssh/id_rsa.pub'.format(home_dir)) + assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir)) + assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir)) - assert 'root 755' == client.execute('stat -c "%U %a" /home') + assert "root 755" == client.execute('stat -c "%U %a" /home') DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""') @@ -123,75 +129,96 @@ DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""') @pytest.mark.user_data(DEFAULT_KEYS_USERDATA) def test_authorized_keys_default(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/.ssh/authorized_keys', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/.ssh/authorized_keys', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/.ssh/authorized_keys', - TEST_DEFAULT_KEYS), - ('root', '/root/.ssh/authorized_keys', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' " - "/etc/ssh/sshd_config")) +AUTHORIZED_KEYS2_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA) def test_authorized_keys2(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/.ssh/authorized_keys2', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/.ssh/authorized_keys2', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/.ssh/authorized_keys2', - TEST_DEFAULT_KEYS), - ('root', '/root/.ssh/authorized_keys2', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys2", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys2", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -NESTED_KEYS_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' " - "/etc/ssh/sshd_config")) +NESTED_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(NESTED_KEYS_USERDATA) def test_nested_keys(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/home/test_user1/foo/bar/ssh/keys', - TEST_USER1_KEYS), - ('test_user2', '/home/test_user2/foo/bar/ssh/keys', - TEST_USER2_KEYS), - ('ubuntu', '/home/ubuntu/foo/bar/ssh/keys', - TEST_DEFAULT_KEYS), - ('root', '/root/foo/bar/ssh/keys', TEST_DEFAULT_KEYS), + ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS), + ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS), + ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), + ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) -EXTERNAL_KEYS_USERDATA = _USERDATA.format(bootcmd=( - "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " - "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' " - "/etc/ssh/sshd_config")) +EXTERNAL_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' " + "/etc/ssh/sshd_config" + ) +) @pytest.mark.ubuntu @pytest.mark.user_data(EXTERNAL_KEYS_USERDATA) def test_external_keys(client: IntegrationInstance): expected_keys = [ - ('test_user1', '/etc/ssh/authorized_keys/test_user1/keys', - TEST_USER1_KEYS), - ('test_user2', '/etc/ssh/authorized_keys/test_user2/keys', - TEST_USER2_KEYS), - ('ubuntu', '/etc/ssh/authorized_keys/ubuntu/keys', - TEST_DEFAULT_KEYS), - ('root', '/etc/ssh/authorized_keys/root/keys', TEST_DEFAULT_KEYS), + ( + "test_user1", + "/etc/ssh/authorized_keys/test_user1/keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/etc/ssh/authorized_keys/test_user2/keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS), + ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS), ] common_verify(client, expected_keys) diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py index fffa0746..e4a4241f 100644 --- a/tests/integration_tests/modules/test_user_events.py +++ b/tests/integration_tests/modules/test_user_events.py @@ -3,8 +3,9 @@ This is currently limited to applying network config on BOOT events. """ -import pytest import re + +import pytest import yaml from tests.integration_tests.instances import IntegrationInstance @@ -13,16 +14,16 @@ from tests.integration_tests.instances import IntegrationInstance def _add_dummy_bridge_to_netplan(client: IntegrationInstance): # Update netplan configuration to ensure it doesn't change on reboot netplan = yaml.safe_load( - client.execute('cat /etc/netplan/50-cloud-init.yaml') + client.execute("cat /etc/netplan/50-cloud-init.yaml") ) # Just a dummy bridge to do nothing try: - netplan['network']['bridges']['dummy0'] = {'dhcp4': False} + netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False} except KeyError: - netplan['network']['bridges'] = {'dummy0': {'dhcp4': False}} + netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}} dumped_netplan = yaml.dump(netplan) - client.write_to_file('/etc/netplan/50-cloud-init.yaml', dumped_netplan) + client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan) @pytest.mark.lxd_container @@ -32,19 +33,19 @@ def _add_dummy_bridge_to_netplan(client: IntegrationInstance): @pytest.mark.oci @pytest.mark.openstack def test_boot_event_disabled_by_default(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - if 'network config is disabled' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: pytest.skip("network config disabled. Test doesn't apply") - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") _add_dummy_bridge_to_netplan(client) - client.execute('rm /var/log/cloud-init.log') + client.execute("rm /var/log/cloud-init.log") client.restart() - log2 = client.read_from_file('/var/log/cloud-init.log') + log2 = client.read_from_file("/var/log/cloud-init.log") - if 'cache invalid in datasource' in log2: + if "cache invalid in datasource" in log2: # Invalid cache will get cleared, meaning we'll create a new # "instance" and apply networking config, so events aren't # really relevant here @@ -53,8 +54,9 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance): # We attempt to apply network config twice on every boot. # Ensure neither time works. assert 2 == len( - re.findall(r"Event Denied: scopes=\['network'\] EventType=boot[^-]", - log2) + re.findall( + r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2 + ) ) assert 2 == log2.count( "Event Denied: scopes=['network'] EventType=boot-legacy" @@ -64,30 +66,30 @@ def test_boot_event_disabled_by_default(client: IntegrationInstance): " nor datasource network update allowed" ) - assert 'dummy0' in client.execute('ls /sys/class/net') + assert "dummy0" in client.execute("ls /sys/class/net") def _test_network_config_applied_on_reboot(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - if 'network config is disabled' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: pytest.skip("network config disabled. Test doesn't apply") - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") _add_dummy_bridge_to_netplan(client) client.execute('echo "" > /var/log/cloud-init.log') client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - if 'cache invalid in datasource' in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "cache invalid in datasource" in log: # Invalid cache will get cleared, meaning we'll create a new # "instance" and apply networking config, so events aren't # really relevant here pytest.skip("Test only valid for existing instances") - assert 'Event Allowed: scope=network EventType=boot' in log - assert 'Applying network configuration' in log - assert 'dummy0' not in client.execute('ls /sys/class/net') + assert "Event Allowed: scope=network EventType=boot" in log + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") @pytest.mark.azure diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py index bcb17b7f..3d1358ce 100644 --- a/tests/integration_tests/modules/test_users_groups.py +++ b/tests/integration_tests/modules/test_users_groups.py @@ -11,7 +11,6 @@ import pytest from tests.integration_tests.clouds import ImageSpecification from tests.integration_tests.instances import IntegrationInstance - USER_DATA = """\ #cloud-config # Add groups to the system @@ -84,7 +83,9 @@ class TestUsersGroups: assert re.search(regex, result.stdout) is not None, ( "'getent {}' resulted in '{}', " "but expected to match regex {}".format( - ' '.join(getent_args), result.stdout, regex)) + " ".join(getent_args), result.stdout, regex + ) + ) def test_user_root_in_secret(self, class_client): """Test root user is in 'secret' group.""" @@ -105,19 +106,21 @@ def test_sudoers_includedir(client: IntegrationInstance): https://github.com/canonical/cloud-init/pull/783 """ if ImageSpecification.from_os_image().release in [ - 'xenial', 'bionic', 'focal' + "xenial", + "bionic", + "focal", ]: raise pytest.skip( - 'Test requires version of sudo installed on groovy and later' + "Test requires version of sudo installed on groovy and later" ) client.execute("sed -i 's/#include/@include/g' /etc/sudoers") - sudoers = client.read_from_file('/etc/sudoers') - if '@includedir /etc/sudoers.d' not in sudoers: + sudoers = client.read_from_file("/etc/sudoers") + if "@includedir /etc/sudoers.d" not in sudoers: client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers") client.instance.clean() client.restart() - sudoers = client.read_from_file('/etc/sudoers') + sudoers = client.read_from_file("/etc/sudoers") - assert '#includedir' not in sudoers - assert sudoers.count('includedir /etc/sudoers.d') == 1 + assert "#includedir" not in sudoers + assert sudoers.count("includedir /etc/sudoers.d") == 1 diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py index f28079d4..3168cd60 100644 --- a/tests/integration_tests/modules/test_version_change.py +++ b/tests/integration_tests/modules/test_version_change.py @@ -5,39 +5,40 @@ import pytest from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.util import ASSETS_DIR, verify_clean_log - -PICKLE_PATH = Path('/var/lib/cloud/instance/obj.pkl') -TEST_PICKLE = ASSETS_DIR / 'test_version_change.pkl' +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl" def _assert_no_pickle_problems(log): - assert 'Failed loading pickled blob' not in log + assert "Failed loading pickled blob" not in log verify_clean_log(log) def test_reboot_without_version_change(client: IntegrationInstance): - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected' not in log - assert 'Cache compatibility status is currently unknown.' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Cache compatibility status is currently unknown." not in log _assert_no_pickle_problems(log) client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected' not in log - assert 'Could not determine Python version used to write cache' not in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Could not determine Python version used to write cache" not in log _assert_no_pickle_problems(log) # Now ensure that loading a bad pickle gives us problems client.push_file(TEST_PICKLE, PICKLE_PATH) client.restart() - log = client.read_from_file('/var/log/cloud-init.log') + log = client.read_from_file("/var/log/cloud-init.log") # no cache found is an "expected" upgrade error, and # "Failed" means we're unable to load the pickle - assert any([ - 'Failed loading pickled blob from {}'.format(PICKLE_PATH) in log, - 'no cache found' in log - ]) + assert any( + [ + "Failed loading pickled blob from {}".format(PICKLE_PATH) in log, + "no cache found" in log, + ] + ) @pytest.mark.ec2 @@ -54,8 +55,8 @@ def test_cache_purged_on_version_change(client: IntegrationInstance): client.push_file(TEST_PICKLE, PICKLE_PATH) client.execute("echo '1.0' > /var/lib/cloud/data/python-version") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - assert 'Python version change detected. Purging cache' in log + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected. Purging cache" in log _assert_no_pickle_problems(log) @@ -65,11 +66,11 @@ def test_log_message_on_missing_version_file(client: IntegrationInstance): client.execute("rm /var/lib/cloud/data/python-version") client.execute("rm /var/log/cloud-init.log") client.restart() - log = client.read_from_file('/var/log/cloud-init.log') - if 'no cache found' not in log: + log = client.read_from_file("/var/log/cloud-init.log") + if "no cache found" not in log: # We don't expect the python version file to exist if we have no # pre-existing cache assert ( - 'Writing python-version file. ' - 'Cache compatibility status is currently unknown.' - ) in log + "Writing python-version file. " + "Cache compatibility status is currently unknown." in log + ) diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py index 1d532fac..1eb7e945 100644 --- a/tests/integration_tests/modules/test_write_files.py +++ b/tests/integration_tests/modules/test_write_files.py @@ -7,8 +7,8 @@ and then checks if those files were created during boot. ``tests/cloud_tests/testcases/modules/write_files.yaml``.)""" import base64 -import pytest +import pytest ASCII_TEXT = "ASCII text" B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8")) @@ -50,25 +50,30 @@ write_files: defer: true owner: 'myuser' permissions: '0644' -""".format(B64_CONTENT.decode("ascii")) +""".format( + B64_CONTENT.decode("ascii") +) @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestWriteFiles: - @pytest.mark.parametrize( - "cmd,expected_out", ( + "cmd,expected_out", + ( ("file /root/file_b64", ASCII_TEXT), ("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"), - ("sha256sum </root/file_binary", ( + ( + "sha256sum </root/file_binary", "2c791c4037ea5bd7e928d6a87380f8ba" - "7a803cd83d5e4f269e28f5090f0f2c9a" - )), - ("file /root/file_gzip", - "POSIX shell script, ASCII text executable"), + "7a803cd83d5e4f269e28f5090f0f2c9a", + ), + ( + "file /root/file_gzip", + "POSIX shell script, ASCII text executable", + ), ("file /root/file_text", ASCII_TEXT), - ) + ), ) def test_write_files(self, cmd, expected_out, class_client): out = class_client.execute(cmd) @@ -82,6 +87,7 @@ class TestWriteFiles: """ out = class_client.read_from_file("/home/testuser/my-file") assert "echo 'hello world!'" == out - assert class_client.execute( - 'stat -c "%U %a" /home/testuser/my-file' - ) == 'myuser 644' + assert ( + class_client.execute('stat -c "%U %a" /home/testuser/my-file') + == "myuser 644" + ) |