summaryrefslogtreecommitdiff
path: root/tests/integration_tests/conftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration_tests/conftest.py')
-rw-r--r--tests/integration_tests/conftest.py130
1 files changed, 70 insertions, 60 deletions
diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py
index 5eab5a45..b14b6ad0 100644
--- a/tests/integration_tests/conftest.py
+++ b/tests/integration_tests/conftest.py
@@ -2,12 +2,13 @@
import datetime
import functools
import logging
-import pytest
import os
import sys
-from tarfile import TarFile
from contextlib import contextmanager
from pathlib import Path
+from tarfile import TarFile
+
+import pytest
from tests.integration_tests import integration_settings
from tests.integration_tests.clouds import (
@@ -19,31 +20,30 @@ from tests.integration_tests.clouds import (
LxdContainerCloud,
LxdVmCloud,
OciCloud,
- _LxdIntegrationCloud,
OpenstackCloud,
+ _LxdIntegrationCloud,
)
from tests.integration_tests.instances import (
CloudInitSource,
IntegrationInstance,
)
-
-log = logging.getLogger('integration_testing')
+log = logging.getLogger("integration_testing")
log.addHandler(logging.StreamHandler(sys.stdout))
log.setLevel(logging.INFO)
platforms = {
- 'ec2': Ec2Cloud,
- 'gce': GceCloud,
- 'azure': AzureCloud,
- 'oci': OciCloud,
- 'lxd_container': LxdContainerCloud,
- 'lxd_vm': LxdVmCloud,
- 'openstack': OpenstackCloud,
+ "ec2": Ec2Cloud,
+ "gce": GceCloud,
+ "azure": AzureCloud,
+ "oci": OciCloud,
+ "lxd_container": LxdContainerCloud,
+ "lxd_vm": LxdVmCloud,
+ "openstack": OpenstackCloud,
}
os_list = ["ubuntu"]
-session_start_time = datetime.datetime.now().strftime('%y%m%d%H%M%S')
+session_start_time = datetime.datetime.now().strftime("%y%m%d%H%M%S")
XENIAL_LXD_VM_EXEC_MSG = """\
The default xenial images do not support `exec` for LXD VMs.
@@ -69,14 +69,14 @@ def pytest_runtest_setup(item):
test_marks = [mark.name for mark in item.iter_markers()]
supported_platforms = set(all_platforms).intersection(test_marks)
current_platform = integration_settings.PLATFORM
- unsupported_message = 'Cannot run on platform {}'.format(current_platform)
- if 'no_container' in test_marks:
- if 'lxd_container' in test_marks:
+ unsupported_message = "Cannot run on platform {}".format(current_platform)
+ if "no_container" in test_marks:
+ if "lxd_container" in test_marks:
raise Exception(
- 'lxd_container and no_container marks simultaneously set '
- 'on test'
+ "lxd_container and no_container marks simultaneously set "
+ "on test"
)
- if current_platform == 'lxd_container':
+ if current_platform == "lxd_container":
pytest.skip(unsupported_message)
if supported_platforms and current_platform not in supported_platforms:
pytest.skip(unsupported_message)
@@ -86,8 +86,8 @@ def pytest_runtest_setup(item):
supported_os_set = set(os_list).intersection(test_marks)
if current_os and supported_os_set and current_os not in supported_os_set:
pytest.skip("Cannot run on OS {}".format(current_os))
- if 'unstable' in test_marks and not integration_settings.RUN_UNSTABLE:
- pytest.skip('Test marked unstable. Manually remove mark to run it')
+ if "unstable" in test_marks and not integration_settings.RUN_UNSTABLE:
+ pytest.skip("Test marked unstable. Manually remove mark to run it")
current_release = image.release
if "not_{}".format(current_release) in test_marks:
@@ -101,7 +101,7 @@ def disable_subp_usage(request):
pass
-@pytest.yield_fixture(scope='session')
+@pytest.yield_fixture(scope="session")
def session_cloud():
if integration_settings.PLATFORM not in platforms.keys():
raise ValueError(
@@ -122,28 +122,30 @@ def session_cloud():
def get_validated_source(
session_cloud: IntegrationCloud,
- source=integration_settings.CLOUD_INIT_SOURCE
+ source=integration_settings.CLOUD_INIT_SOURCE,
) -> CloudInitSource:
- if source == 'NONE':
+ if source == "NONE":
return CloudInitSource.NONE
- elif source == 'IN_PLACE':
- if session_cloud.datasource not in ['lxd_container', 'lxd_vm']:
+ elif source == "IN_PLACE":
+ if session_cloud.datasource not in ["lxd_container", "lxd_vm"]:
raise ValueError(
- 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD')
+ "IN_PLACE as CLOUD_INIT_SOURCE only works for LXD"
+ )
return CloudInitSource.IN_PLACE
- elif source == 'PROPOSED':
+ elif source == "PROPOSED":
return CloudInitSource.PROPOSED
- elif source.startswith('ppa:'):
+ elif source.startswith("ppa:"):
return CloudInitSource.PPA
elif os.path.isfile(str(source)):
return CloudInitSource.DEB_PACKAGE
elif source == "UPGRADE":
return CloudInitSource.UPGRADE
raise ValueError(
- 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(source))
+ "Invalid value for CLOUD_INIT_SOURCE setting: {}".format(source)
+ )
-@pytest.fixture(scope='session')
+@pytest.fixture(scope="session")
def setup_image(session_cloud: IntegrationCloud):
"""Setup the target environment with the correct version of cloud-init.
@@ -153,17 +155,18 @@ def setup_image(session_cloud: IntegrationCloud):
source = get_validated_source(session_cloud)
if not source.installs_new_version():
return
- log.info('Setting up environment for %s', session_cloud.datasource)
+ log.info("Setting up environment for %s", session_cloud.datasource)
client = session_cloud.launch()
client.install_new_cloud_init(source)
# Even if we're keeping instances, we don't want to keep this
# one around as it was just for image creation
client.destroy()
- log.info('Done with environment setup')
+ log.info("Done with environment setup")
-def _collect_logs(instance: IntegrationInstance, node_id: str,
- test_failed: bool):
+def _collect_logs(
+ instance: IntegrationInstance, node_id: str, test_failed: bool
+):
"""Collect logs from remote instance.
Args:
@@ -172,36 +175,43 @@ def _collect_logs(instance: IntegrationInstance, node_id: str,
tests/integration_tests/test_example.py::TestExample.test_example
test_failed: If test failed or not
"""
- if any([
- integration_settings.COLLECT_LOGS == 'NEVER',
- integration_settings.COLLECT_LOGS == 'ON_ERROR' and not test_failed
- ]):
+ if any(
+ [
+ integration_settings.COLLECT_LOGS == "NEVER",
+ integration_settings.COLLECT_LOGS == "ON_ERROR"
+ and not test_failed,
+ ]
+ ):
return
instance.execute(
- 'cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz')
+ "cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz"
+ )
node_id_path = Path(
- node_id
- .replace('.py', '') # Having a directory with '.py' would be weird
- .replace('::', os.path.sep) # Turn classes/tests into paths
- .replace('[', '-') # For parametrized names
- .replace(']', '') # For parameterized names
+ node_id.replace(
+ ".py", ""
+ ) # Having a directory with '.py' would be weird
+ .replace("::", os.path.sep) # Turn classes/tests into paths
+ .replace("[", "-") # For parametrized names
+ .replace("]", "") # For parameterized names
+ )
+ log_dir = (
+ Path(integration_settings.LOCAL_LOG_PATH)
+ / session_start_time
+ / node_id_path
)
- log_dir = Path(
- integration_settings.LOCAL_LOG_PATH
- ) / session_start_time / node_id_path
log.info("Writing logs to %s", log_dir)
if not log_dir.exists():
log_dir.mkdir(parents=True)
# Add a symlink to the latest log output directory
- last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / 'last'
+ last_symlink = Path(integration_settings.LOCAL_LOG_PATH) / "last"
if os.path.islink(last_symlink):
os.unlink(last_symlink)
os.symlink(log_dir.parent, last_symlink)
- tarball_path = log_dir / 'cloud-init.tar.gz'
- instance.pull_file('/var/tmp/cloud-init.tar.gz', tarball_path)
+ tarball_path = log_dir / "cloud-init.tar.gz"
+ instance.pull_file("/var/tmp/cloud-init.tar.gz", tarball_path)
tarball = TarFile.open(str(tarball_path))
tarball.extractall(path=str(log_dir))
@@ -218,12 +228,12 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud):
getter = functools.partial(
fixture_utils.closest_marker_first_arg_or, request, default=None
)
- user_data = getter('user_data')
- name = getter('instance_name')
- lxd_config_dict = getter('lxd_config_dict')
- lxd_setup = getter('lxd_setup')
+ user_data = getter("user_data")
+ name = getter("instance_name")
+ lxd_config_dict = getter("lxd_config_dict")
+ lxd_setup = getter("lxd_setup")
lxd_use_exec = fixture_utils.closest_marker_args_or(
- request, 'lxd_use_exec', None
+ request, "lxd_use_exec", None
)
launch_kwargs = {}
@@ -250,8 +260,8 @@ def _client(request, fixture_utils, session_cloud: IntegrationCloud):
local_launch_kwargs = {}
if lxd_setup is not None:
if not isinstance(session_cloud, _LxdIntegrationCloud):
- pytest.skip('lxd_setup requires LXD')
- local_launch_kwargs['lxd_setup'] = lxd_setup
+ pytest.skip("lxd_setup requires LXD")
+ local_launch_kwargs["lxd_setup"] = lxd_setup
with session_cloud.launch(
user_data=user_data, launch_kwargs=launch_kwargs, **local_launch_kwargs
@@ -273,14 +283,14 @@ def client(request, fixture_utils, session_cloud, setup_image):
yield client
-@pytest.yield_fixture(scope='module')
+@pytest.yield_fixture(scope="module")
def module_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per module."""
with _client(request, fixture_utils, session_cloud) as client:
yield client
-@pytest.yield_fixture(scope='class')
+@pytest.yield_fixture(scope="class")
def class_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per class."""
with _client(request, fixture_utils, session_cloud) as client: