diff options
Diffstat (limited to 'tests/integration_tests/conftest.py')
-rw-r--r-- | tests/integration_tests/conftest.py | 273 |
1 files changed, 211 insertions, 62 deletions
diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 73b44bfc..a90a5d49 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -1,33 +1,51 @@ # This file is part of cloud-init. See LICENSE file for license information. +import datetime +import functools import logging import os -import pytest import sys from contextlib import contextmanager +from pathlib import Path +from tarfile import TarFile +from typing import Dict, Type + +import pytest +from pycloudlib.lxd.instance import LXDInstance from tests.integration_tests import integration_settings from tests.integration_tests.clouds import ( + AzureCloud, Ec2Cloud, GceCloud, - AzureCloud, - OciCloud, + ImageSpecification, + IntegrationCloud, LxdContainerCloud, LxdVmCloud, + OciCloud, + OpenstackCloud, + _LxdIntegrationCloud, +) +from tests.integration_tests.instances import ( + CloudInitSource, + IntegrationInstance, ) - -log = logging.getLogger('integration_testing') +log = logging.getLogger("integration_testing") log.addHandler(logging.StreamHandler(sys.stdout)) log.setLevel(logging.INFO) -platforms = { - 'ec2': Ec2Cloud, - 'gce': GceCloud, - 'azure': AzureCloud, - 'oci': OciCloud, - 'lxd_container': LxdContainerCloud, - 'lxd_vm': LxdVmCloud, +platforms: Dict[str, Type[IntegrationCloud]] = { + "ec2": Ec2Cloud, + "gce": GceCloud, + "azure": AzureCloud, + "oci": OciCloud, + "lxd_container": LxdContainerCloud, + "lxd_vm": LxdVmCloud, + "openstack": OpenstackCloud, } +os_list = ["ubuntu"] + +session_start_time = datetime.datetime.now().strftime("%y%m%d%H%M%S") def pytest_runtest_setup(item): @@ -42,18 +60,30 @@ def pytest_runtest_setup(item): test_marks = [mark.name for mark in item.iter_markers()] supported_platforms = set(all_platforms).intersection(test_marks) current_platform = integration_settings.PLATFORM - unsupported_message = 'Cannot run on platform {}'.format(current_platform) - if 'no_container' in test_marks: - if 'lxd_container' in test_marks: + unsupported_message = "Cannot run on platform {}".format(current_platform) + if "no_container" in test_marks: + if "lxd_container" in test_marks: raise Exception( - 'lxd_container and no_container marks simultaneously set ' - 'on test' + "lxd_container and no_container marks simultaneously set " + "on test" ) - if current_platform == 'lxd_container': + if current_platform == "lxd_container": pytest.skip(unsupported_message) if supported_platforms and current_platform not in supported_platforms: pytest.skip(unsupported_message) + image = ImageSpecification.from_os_image() + current_os = image.os + supported_os_set = set(os_list).intersection(test_marks) + if current_os and supported_os_set and current_os not in supported_os_set: + pytest.skip("Cannot run on OS {}".format(current_os)) + if "unstable" in test_marks and not integration_settings.RUN_UNSTABLE: + pytest.skip("Test marked unstable. Manually remove mark to run it") + + current_release = image.release + if "not_{}".format(current_release) in test_marks: + pytest.skip("Cannot run on release {}".format(current_release)) + # disable_subp_usage is defined at a higher level, but we don't # want it applied here @@ -62,7 +92,7 @@ def disable_subp_usage(request): pass -@pytest.yield_fixture(scope='session') +@pytest.fixture(scope="session") def session_cloud(): if integration_settings.PLATFORM not in platforms.keys(): raise ValueError( @@ -74,83 +104,185 @@ def session_cloud(): cloud = platforms[integration_settings.PLATFORM]() cloud.emit_settings_to_log() + yield cloud + cloud.destroy() -@pytest.fixture(scope='session', autouse=True) -def setup_image(session_cloud): +def get_validated_source( + session_cloud: IntegrationCloud, + source=integration_settings.CLOUD_INIT_SOURCE, +) -> CloudInitSource: + if source == "NONE": + return CloudInitSource.NONE + elif source == "IN_PLACE": + if session_cloud.datasource not in ["lxd_container", "lxd_vm"]: + raise ValueError( + "IN_PLACE as CLOUD_INIT_SOURCE only works for LXD" + ) + return CloudInitSource.IN_PLACE + elif source == "PROPOSED": + return CloudInitSource.PROPOSED + elif source.startswith("ppa:"): + return CloudInitSource.PPA + elif os.path.isfile(str(source)): + return CloudInitSource.DEB_PACKAGE + elif source == "UPGRADE": + return CloudInitSource.UPGRADE + raise ValueError( + "Invalid value for CLOUD_INIT_SOURCE setting: {}".format(source) + ) + + +@pytest.fixture(scope="session") +def setup_image(session_cloud: IntegrationCloud, request): """Setup the target environment with the correct version of cloud-init. So we can launch instances / run tests with the correct image """ - client = None - log.info('Setting up environment for %s', session_cloud.datasource) - if integration_settings.CLOUD_INIT_SOURCE == 'NONE': - pass # that was easy - elif integration_settings.CLOUD_INIT_SOURCE == 'IN_PLACE': - if session_cloud.datasource not in ['lxd_container', 'lxd_vm']: - raise ValueError( - 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD') - # The mount needs to happen after the instance is created, so - # no further action needed here - elif integration_settings.CLOUD_INIT_SOURCE == 'PROPOSED': - client = session_cloud.launch() - client.install_proposed_image() - elif integration_settings.CLOUD_INIT_SOURCE.startswith('ppa:'): - client = session_cloud.launch() - client.install_ppa(integration_settings.CLOUD_INIT_SOURCE) - elif os.path.isfile(str(integration_settings.CLOUD_INIT_SOURCE)): - client = session_cloud.launch() - client.install_deb() - else: - raise ValueError( - 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format( - integration_settings.CLOUD_INIT_SOURCE)) - if client: - # Even if we're keeping instances, we don't want to keep this - # one around as it was just for image creation - client.destroy() - log.info('Done with environment setup') + + source = get_validated_source(session_cloud) + if not source.installs_new_version(): + return + log.info("Setting up environment for %s", session_cloud.datasource) + client = session_cloud.launch() + client.install_new_cloud_init(source) + # Even if we're keeping instances, we don't want to keep this + # one around as it was just for image creation + client.destroy() + log.info("Done with environment setup") + + # For some reason a yield here raises a + # ValueError: setup_image did not yield a value + # during setup so use a finalizer instead. + request.addfinalizer(session_cloud.delete_snapshot) + + +def _collect_logs( + instance: IntegrationInstance, node_id: str, test_failed: bool +): + """Collect logs from remote instance. + + Args: + instance: The current IntegrationInstance to collect logs from + node_id: The pytest representation of this test, E.g.: + tests/integration_tests/test_example.py::TestExample.test_example + test_failed: If test failed or not + """ + if any( + [ + integration_settings.COLLECT_LOGS == "NEVER", + integration_settings.COLLECT_LOGS == "ON_ERROR" + and not test_failed, + ] + ): + return + instance.execute( + "cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz" + ) + node_id_path = Path( + node_id.replace( + ".py", "" + ) # Having a directory with '.py' would be weird + .replace("::", os.path.sep) # Turn classes/tests into paths + .replace("[", "-") # For parametrized names + .replace("]", "") # For parameterized names + ) + log_dir = ( + Path(integration_settings.LOCAL_LOG_PATH) + / session_start_time + / node_id_path + ) + log.info("Writing logs to %s", log_dir) + + if not log_dir.exists(): + log_dir.mkdir(parents=True) + + # Add a symlink to the latest log output directory + last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / "last" + if os.path.islink(last_symlink): + os.unlink(last_symlink) + os.symlink(log_dir.parent, last_symlink) + + tarball_path = log_dir / "cloud-init.tar.gz" + try: + instance.pull_file("/var/tmp/cloud-init.tar.gz", tarball_path) + except Exception as e: + log.error("Failed to pull logs: %s", e) + return + + tarball = TarFile.open(str(tarball_path)) + tarball.extractall(path=str(log_dir)) + tarball_path.unlink() @contextmanager -def _client(request, fixture_utils, session_cloud): +def _client(request, fixture_utils, session_cloud: IntegrationCloud): """Fixture implementation for the client fixtures. Launch the dynamic IntegrationClient instance using any provided userdata, yield to the test, then cleanup """ - user_data = fixture_utils.closest_marker_first_arg_or( - request, 'user_data', None) - name = fixture_utils.closest_marker_first_arg_or( - request, 'instance_name', None + getter = functools.partial( + fixture_utils.closest_marker_first_arg_or, request, default=None + ) + user_data = getter("user_data") + name = getter("instance_name") + lxd_config_dict = getter("lxd_config_dict") + lxd_setup = getter("lxd_setup") + lxd_use_exec = fixture_utils.closest_marker_args_or( + request, "lxd_use_exec", None ) + launch_kwargs = {} if name is not None: - launch_kwargs = {"name": name} + launch_kwargs["name"] = name + if lxd_config_dict is not None: + if not isinstance(session_cloud, _LxdIntegrationCloud): + pytest.skip("lxd_config_dict requires LXD") + launch_kwargs["config_dict"] = lxd_config_dict + if lxd_use_exec is not None: + if not isinstance(session_cloud, _LxdIntegrationCloud): + pytest.skip("lxd_use_exec requires LXD") + launch_kwargs["execute_via_ssh"] = False + local_launch_kwargs = {} + if lxd_setup is not None: + if not isinstance(session_cloud, _LxdIntegrationCloud): + pytest.skip("lxd_setup requires LXD") + local_launch_kwargs["lxd_setup"] = lxd_setup + with session_cloud.launch( - user_data=user_data, launch_kwargs=launch_kwargs + user_data=user_data, launch_kwargs=launch_kwargs, **local_launch_kwargs ) as instance: + if lxd_use_exec is not None and isinstance( + instance.instance, LXDInstance + ): + # Existing instances are not affected by the launch kwargs, so + # ensure it here; we still need the launch kwarg so waiting works + instance.instance.execute_via_ssh = False + previous_failures = request.session.testsfailed yield instance + test_failed = request.session.testsfailed - previous_failures > 0 + _collect_logs(instance, request.node.nodeid, test_failed) -@pytest.yield_fixture -def client(request, fixture_utils, session_cloud): +@pytest.fixture +def client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs for every test.""" with _client(request, fixture_utils, session_cloud) as client: yield client -@pytest.yield_fixture(scope='module') -def module_client(request, fixture_utils, session_cloud): +@pytest.fixture(scope="module") +def module_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per module.""" with _client(request, fixture_utils, session_cloud) as client: yield client -@pytest.yield_fixture(scope='class') -def class_client(request, fixture_utils, session_cloud): +@pytest.fixture(scope="class") +def class_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per class.""" with _client(request, fixture_utils, session_cloud) as client: yield client @@ -180,3 +312,20 @@ def pytest_assertrepr_compare(op, left, right): '"{}" not in cloud-init.log string; unexpectedly found on' " these lines:".format(left) ] + found_lines + + +def pytest_configure(config): + """Perform initial configuration, before the test runs start. + + This hook is only called if integration tests are being executed, so we can + use it to configure defaults for integration testing that differ from the + rest of the tests in the codebase. + + See + https://docs.pytest.org/en/latest/reference.html#_pytest.hookspec.pytest_configure + for pytest's documentation. + """ + if "log_cli_level" in config.option and not config.option.log_cli_level: + # If log_cli_level is available in this version of pytest and not set + # to anything, set it to INFO. + config.option.log_cli_level = "INFO" |