summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/cloud_tests/testcases/examples/TODO.md2
-rw-r--r--tests/integration_tests/bugs/test_gh570.py39
-rw-r--r--tests/integration_tests/bugs/test_gh626.py42
-rw-r--r--tests/integration_tests/bugs/test_gh632.py33
-rw-r--r--tests/integration_tests/bugs/test_gh668.py37
-rw-r--r--tests/integration_tests/bugs/test_gh671.py55
-rw-r--r--tests/integration_tests/bugs/test_lp1813396.py34
-rw-r--r--tests/integration_tests/bugs/test_lp1898997.py71
-rw-r--r--tests/integration_tests/bugs/test_lp1900837.py3
-rw-r--r--tests/integration_tests/bugs/test_lp1910835.py66
-rw-r--r--tests/integration_tests/clouds.py140
-rw-r--r--tests/integration_tests/conftest.py180
-rw-r--r--tests/integration_tests/instances.py104
-rw-r--r--tests/integration_tests/integration_settings.py52
-rw-r--r--tests/integration_tests/log_utils.py11
-rw-r--r--tests/integration_tests/modules/test_apt_configure_sources_list.py1
-rw-r--r--tests/integration_tests/modules/test_ca_certs.py91
-rw-r--r--tests/integration_tests/modules/test_cli.py45
-rw-r--r--tests/integration_tests/modules/test_lxd_bridge.py48
-rw-r--r--tests/integration_tests/modules/test_package_update_upgrade_install.py1
-rw-r--r--tests/integration_tests/modules/test_power_state_change.py90
-rw-r--r--tests/integration_tests/modules/test_seed_random_data.py2
-rw-r--r--tests/integration_tests/modules/test_snap.py1
-rw-r--r--tests/integration_tests/modules/test_ssh_import_id.py5
-rw-r--r--tests/integration_tests/modules/test_ssh_keys_provided.py138
-rw-r--r--tests/integration_tests/modules/test_users_groups.py5
-rw-r--r--tests/integration_tests/test_upgrade.py98
-rw-r--r--tests/unittests/test_datasource/test_azure.py44
-rw-r--r--tests/unittests/test_datasource/test_ovf.py291
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py298
-rw-r--r--tests/unittests/test_net.py10
-rw-r--r--tests/unittests/test_vmware_config_file.py16
32 files changed, 1757 insertions, 296 deletions
diff --git a/tests/cloud_tests/testcases/examples/TODO.md b/tests/cloud_tests/testcases/examples/TODO.md
index 8db0e98e..cde699a7 100644
--- a/tests/cloud_tests/testcases/examples/TODO.md
+++ b/tests/cloud_tests/testcases/examples/TODO.md
@@ -6,7 +6,7 @@ Below lists each of the issing examples and why it is not currently added.
- Puppet (takes > 60 seconds to run)
- Manage resolve.conf (lxd backend overrides changes)
- Adding a yum repository (need centos system)
- - Register RedHat Subscription (need centos system + subscription)
+ - Register Red Hat Subscription (need centos system + subscription)
- Adjust mount points mounted (need multiple disks)
- Call a url when finished (need end point)
- Reboot/poweroff when finished (how to test)
diff --git a/tests/integration_tests/bugs/test_gh570.py b/tests/integration_tests/bugs/test_gh570.py
new file mode 100644
index 00000000..534cfb9a
--- /dev/null
+++ b/tests/integration_tests/bugs/test_gh570.py
@@ -0,0 +1,39 @@
+"""Integration test for #570.
+
+Test that we can add optional vendor-data to the seedfrom file in a
+NoCloud environment
+"""
+
+from tests.integration_tests.instances import IntegrationInstance
+import pytest
+
+VENDOR_DATA = """\
+#cloud-config
+runcmd:
+ - touch /var/tmp/seeded_vendordata_test_file
+"""
+
+
+# Only running on LXD because we need NoCloud for this test
+@pytest.mark.sru_2020_11
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+def test_nocloud_seedfrom_vendordata(client: IntegrationInstance):
+ seed_dir = '/var/tmp/test_seed_dir'
+ result = client.execute(
+ "mkdir {seed_dir} && "
+ "touch {seed_dir}/user-data && "
+ "touch {seed_dir}/meta-data && "
+ "echo 'seedfrom: {seed_dir}/' > "
+ "/var/lib/cloud/seed/nocloud-net/meta-data".format(seed_dir=seed_dir)
+ )
+ assert result.return_code == 0
+
+ client.write_to_file(
+ '{}/vendor-data'.format(seed_dir),
+ VENDOR_DATA,
+ )
+ client.execute('cloud-init clean --logs')
+ client.restart()
+ assert client.execute('cloud-init status').ok
+ assert 'seeded_vendordata_test_file' in client.execute('ls /var/tmp')
diff --git a/tests/integration_tests/bugs/test_gh626.py b/tests/integration_tests/bugs/test_gh626.py
new file mode 100644
index 00000000..2d336462
--- /dev/null
+++ b/tests/integration_tests/bugs/test_gh626.py
@@ -0,0 +1,42 @@
+"""Integration test for gh-626.
+
+Ensure if wakeonlan is specified in the network config that it is rendered
+in the /etc/network/interfaces or netplan config.
+"""
+
+import pytest
+import yaml
+
+from tests.integration_tests.clouds import ImageSpecification
+from tests.integration_tests.instances import IntegrationInstance
+
+
+NETWORK_CONFIG = """\
+version: 2
+ethernets:
+ eth0:
+ dhcp4: true
+ wakeonlan: true
+"""
+
+EXPECTED_ENI_END = """\
+iface eth0 inet dhcp
+ ethernet-wol g"""
+
+
+@pytest.mark.sru_2020_11
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+@pytest.mark.lxd_config_dict({
+ 'user.network-config': NETWORK_CONFIG
+})
+def test_wakeonlan(client: IntegrationInstance):
+ if ImageSpecification.from_os_image().release == 'xenial':
+ eni = client.execute('cat /etc/network/interfaces.d/50-cloud-init.cfg')
+ assert eni.endswith(EXPECTED_ENI_END)
+ return
+
+ netplan_cfg = client.execute('cat /etc/netplan/50-cloud-init.yaml')
+ netplan_yaml = yaml.safe_load(netplan_cfg)
+ assert 'wakeonlan' in netplan_yaml['network']['ethernets']['eth0']
+ assert netplan_yaml['network']['ethernets']['eth0']['wakeonlan'] is True
diff --git a/tests/integration_tests/bugs/test_gh632.py b/tests/integration_tests/bugs/test_gh632.py
new file mode 100644
index 00000000..3c1f9347
--- /dev/null
+++ b/tests/integration_tests/bugs/test_gh632.py
@@ -0,0 +1,33 @@
+"""Integration test for gh-632.
+
+Verify that if cloud-init is using DataSourceRbxCloud, there is
+no traceback if the metadata disk cannot be found.
+"""
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+
+
+# With some datasource hacking, we can run this on a NoCloud instance
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+@pytest.mark.sru_2020_11
+def test_datasource_rbx_no_stacktrace(client: IntegrationInstance):
+ client.write_to_file(
+ '/etc/cloud/cloud.cfg.d/90_dpkg.cfg',
+ 'datasource_list: [ RbxCloud, NoCloud ]\n',
+ )
+ client.write_to_file(
+ '/etc/cloud/ds-identify.cfg',
+ 'policy: enabled\n',
+ )
+ client.execute('cloud-init clean --logs')
+ client.restart()
+
+ log = client.read_from_file('/var/log/cloud-init.log')
+ assert 'WARNING' not in log
+ assert 'Traceback' not in log
+ assert 'Failed to load metadata and userdata' not in log
+ assert ("Getting data from <class 'cloudinit.sources.DataSourceRbxCloud."
+ "DataSourceRbxCloud'> failed") not in log
diff --git a/tests/integration_tests/bugs/test_gh668.py b/tests/integration_tests/bugs/test_gh668.py
new file mode 100644
index 00000000..a3a0c374
--- /dev/null
+++ b/tests/integration_tests/bugs/test_gh668.py
@@ -0,0 +1,37 @@
+"""Integration test for gh-668.
+
+Ensure that static route to host is working correctly.
+The original problem is specific to the ENI renderer but that test is suitable
+for all network configuration outputs.
+"""
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+
+
+DESTINATION_IP = "172.16.0.10"
+GATEWAY_IP = "10.0.0.100"
+
+NETWORK_CONFIG = """\
+version: 2
+ethernets:
+ eth0:
+ addresses: [10.0.0.10/8]
+ dhcp4: false
+ routes:
+ - to: {}/32
+ via: {}
+""".format(DESTINATION_IP, GATEWAY_IP)
+
+EXPECTED_ROUTE = "{} via {}".format(DESTINATION_IP, GATEWAY_IP)
+
+
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+@pytest.mark.lxd_config_dict({
+ "user.network-config": NETWORK_CONFIG,
+})
+def test_static_route_to_host(client: IntegrationInstance):
+ route = client.execute("ip route | grep {}".format(DESTINATION_IP))
+ assert route.startswith(EXPECTED_ROUTE)
diff --git a/tests/integration_tests/bugs/test_gh671.py b/tests/integration_tests/bugs/test_gh671.py
new file mode 100644
index 00000000..5e90cdda
--- /dev/null
+++ b/tests/integration_tests/bugs/test_gh671.py
@@ -0,0 +1,55 @@
+"""Integration test for gh-671.
+
+Verify that on Azure that if a default user and password are specified
+through the Azure API that a change in the default password overwrites
+the old password
+"""
+
+import crypt
+
+import pytest
+
+from tests.integration_tests.clouds import IntegrationCloud
+
+OLD_PASSWORD = 'DoIM33tTheComplexityRequirements!??'
+NEW_PASSWORD = 'DoIM33tTheComplexityRequirementsNow!??'
+
+
+def _check_password(instance, unhashed_password):
+ shadow_password = instance.execute('getent shadow ubuntu').split(':')[1]
+ salt = shadow_password.rsplit('$', 1)[0]
+ hashed_password = crypt.crypt(unhashed_password, salt)
+ assert shadow_password == hashed_password
+
+
+@pytest.mark.azure
+@pytest.mark.sru_2020_11
+def test_update_default_password(setup_image, session_cloud: IntegrationCloud):
+ os_profile = {
+ 'os_profile': {
+ 'admin_password': '',
+ 'linux_configuration': {
+ 'disable_password_authentication': False
+ }
+ }
+ }
+ os_profile['os_profile']['admin_password'] = OLD_PASSWORD
+ instance1 = session_cloud.launch(launch_kwargs={'vm_params': os_profile})
+
+ _check_password(instance1, OLD_PASSWORD)
+
+ snapshot_id = instance1.cloud.cloud_instance.snapshot(
+ instance1.instance,
+ delete_provisioned_user=False
+ )
+
+ os_profile['os_profile']['admin_password'] = NEW_PASSWORD
+ try:
+ with session_cloud.launch(launch_kwargs={
+ 'image_id': snapshot_id,
+ 'vm_params': os_profile,
+ }) as instance2:
+ _check_password(instance2, NEW_PASSWORD)
+ finally:
+ session_cloud.cloud_instance.delete_image(snapshot_id)
+ instance1.destroy()
diff --git a/tests/integration_tests/bugs/test_lp1813396.py b/tests/integration_tests/bugs/test_lp1813396.py
new file mode 100644
index 00000000..7ad0e809
--- /dev/null
+++ b/tests/integration_tests/bugs/test_lp1813396.py
@@ -0,0 +1,34 @@
+"""Integration test for lp-1813396
+
+Ensure gpg is called with no tty flag.
+"""
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.log_utils import verify_ordered_items_in_text
+
+
+USER_DATA = """\
+#cloud-config
+apt:
+ sources:
+ cloudinit:
+ source: 'deb [arch=amd64] http://ppa.launchpad.net/cloud-init-dev/daily/ubuntu focal main'
+ keyserver: keyserver.ubuntu.com
+ keyid: E4D304DF
+""" # noqa: E501
+
+
+@pytest.mark.sru_2020_11
+@pytest.mark.user_data(USER_DATA)
+def test_gpg_no_tty(client: IntegrationInstance):
+ log = client.read_from_file('/var/log/cloud-init.log')
+ to_verify = [
+ "Running command ['gpg', '--no-tty', "
+ "'--keyserver=keyserver.ubuntu.com', '--recv-keys', 'E4D304DF'] "
+ "with allowed return codes [0] (shell=False, capture=True)",
+ "Imported key 'E4D304DF' from keyserver 'keyserver.ubuntu.com'",
+ "finish: modules-config/config-apt-configure: SUCCESS",
+ ]
+ verify_ordered_items_in_text(to_verify, log)
diff --git a/tests/integration_tests/bugs/test_lp1898997.py b/tests/integration_tests/bugs/test_lp1898997.py
new file mode 100644
index 00000000..54c88d82
--- /dev/null
+++ b/tests/integration_tests/bugs/test_lp1898997.py
@@ -0,0 +1,71 @@
+"""Integration test for LP: #1898997
+
+cloud-init was incorrectly excluding Open vSwitch bridge members from its list
+of interfaces. This meant that instances which had only one interface which
+was in an Open vSwitch bridge would not boot correctly: cloud-init would not
+find the expected physical interfaces, so would not apply network config.
+
+This test checks that cloud-init believes it has successfully applied the
+network configuration, and confirms that the bridge can be used to ping the
+default gateway.
+"""
+import pytest
+
+MAC_ADDRESS = "de:ad:be:ef:12:34"
+
+
+NETWORK_CONFIG = """\
+bridges:
+ ovs-br:
+ dhcp4: true
+ interfaces:
+ - enp5s0
+ macaddress: 52:54:00:d9:08:1c
+ mtu: 1500
+ openvswitch: {{}}
+ethernets:
+ enp5s0:
+ mtu: 1500
+ set-name: enp5s0
+ match:
+ macaddress: {}
+version: 2
+""".format(MAC_ADDRESS)
+
+
+@pytest.mark.lxd_config_dict({
+ "user.network-config": NETWORK_CONFIG,
+ "volatile.eth0.hwaddr": MAC_ADDRESS,
+})
+@pytest.mark.lxd_vm
+@pytest.mark.not_bionic
+@pytest.mark.not_xenial
+@pytest.mark.sru_2020_11
+@pytest.mark.ubuntu
+class TestInterfaceListingWithOpenvSwitch:
+ def test_ovs_member_interfaces_not_excluded(self, client):
+ # We need to install openvswitch for our provided network configuration
+ # to apply (on next boot), so DHCP on our default interface to fetch it
+ client.execute("dhclient enp5s0")
+ client.execute("apt update -qqy")
+ client.execute("apt-get install -qqy openvswitch-switch")
+
+ # Now our networking config should successfully apply on a clean reboot
+ client.execute("cloud-init clean --logs")
+ client.restart()
+
+ cloudinit_output = client.read_from_file("/var/log/cloud-init.log")
+
+ # Confirm that the network configuration was applied successfully
+ assert "WARN" not in cloudinit_output
+ # Confirm that the applied network config created the OVS bridge
+ assert "ovs-br" in client.execute("ip addr")
+
+ # Test that we can ping our gateway using our bridge
+ gateway = client.execute(
+ "ip -4 route show default | awk '{ print $3 }'"
+ )
+ ping_result = client.execute(
+ "ping -c 1 -W 1 -I ovs-br {}".format(gateway)
+ )
+ assert ping_result.ok
diff --git a/tests/integration_tests/bugs/test_lp1900837.py b/tests/integration_tests/bugs/test_lp1900837.py
index 3fe7d0d0..fcc2b751 100644
--- a/tests/integration_tests/bugs/test_lp1900837.py
+++ b/tests/integration_tests/bugs/test_lp1900837.py
@@ -22,7 +22,8 @@ class TestLogPermissionsNotResetOnReboot:
assert "600" == _get_log_perms(client)
# Reboot
- client.instance.restart()
+ client.restart()
+ assert client.execute('cloud-init status').ok
# Check that permissions are not reset on reboot
assert "600" == _get_log_perms(client)
diff --git a/tests/integration_tests/bugs/test_lp1910835.py b/tests/integration_tests/bugs/test_lp1910835.py
new file mode 100644
index 00000000..87f92d5e
--- /dev/null
+++ b/tests/integration_tests/bugs/test_lp1910835.py
@@ -0,0 +1,66 @@
+"""Integration test for LP: #1910835.
+
+If users do not provide an SSH key and instead ask Azure to generate a key for
+them, the key material available in the IMDS may include CRLF sequences. Prior
+to e56b55452549cb037da0a4165154ffa494e9678a, the Azure datasource handled keys
+via a certificate, the tooling for which removed these sequences. This test
+ensures that cloud-init does not regress support for this Azure behaviour.
+
+This test provides the SSH key configured for tests to the instance in two
+ways: firstly, with CRLFs to mimic the generated keys, via the Azure API;
+secondly, as user-data in unmodified form. This means that even on systems
+which exhibit the bug fetching the platform's metadata, we can SSH into the SUT
+to confirm this (instead of having to assert SSH failure; there are lots of
+reasons SSH might fail).
+
+Once SSH'd in, we check that the two keys in .ssh/authorized_keys have the same
+material: if the Azure datasource has removed the CRLFs correctly, then they
+will match.
+"""
+import pytest
+
+
+USER_DATA_TMPL = """\
+#cloud-config
+ssh_authorized_keys:
+ - {}"""
+
+
+@pytest.mark.sru_2021_01
+@pytest.mark.azure
+def test_crlf_in_azure_metadata_ssh_keys(session_cloud, setup_image):
+ authorized_keys_path = "/home/{}/.ssh/authorized_keys".format(
+ session_cloud.cloud_instance.username
+ )
+ # Pass in user-data to allow us to access the instance when the normal
+ # path fails
+ key_data = session_cloud.cloud_instance.key_pair.public_key_content
+ user_data = USER_DATA_TMPL.format(key_data)
+ # Throw a CRLF into the otherwise good key data, to emulate Azure's
+ # behaviour for generated keys
+ key_data = key_data[:20] + "\r\n" + key_data[20:]
+ vm_params = {
+ "os_profile": {
+ "linux_configuration": {
+ "ssh": {
+ "public_keys": [
+ {"path": authorized_keys_path, "key_data": key_data}
+ ]
+ }
+ }
+ }
+ }
+ with session_cloud.launch(
+ launch_kwargs={"vm_params": vm_params, "user_data": user_data}
+ ) as client:
+ authorized_keys = (
+ client.read_from_file(authorized_keys_path).strip().splitlines()
+ )
+ # We expect one key from the cloud, one from user-data
+ assert 2 == len(authorized_keys)
+ # And those two keys should be the same, except for a possible key
+ # comment, which Azure strips out
+ assert (
+ authorized_keys[0].rsplit(" ")[:2]
+ == authorized_keys[1].split(" ")[:2]
+ )
diff --git a/tests/integration_tests/clouds.py b/tests/integration_tests/clouds.py
index 88ac4408..9eebb10a 100644
--- a/tests/integration_tests/clouds.py
+++ b/tests/integration_tests/clouds.py
@@ -6,7 +6,7 @@ from pycloudlib import EC2, GCE, Azure, OCI, LXDContainer, LXDVirtualMachine
from pycloudlib.lxd.instance import LXDInstance
import cloudinit
-from cloudinit.subp import subp
+from cloudinit.subp import subp, ProcessExecutionError
from tests.integration_tests import integration_settings
from tests.integration_tests.instances import (
IntegrationEc2Instance,
@@ -25,6 +25,65 @@ except ImportError:
log = logging.getLogger('integration_testing')
+def _get_ubuntu_series() -> list:
+ """Use distro-info-data's ubuntu.csv to get a list of Ubuntu series"""
+ out = ""
+ try:
+ out, _err = subp(["ubuntu-distro-info", "-a"])
+ except ProcessExecutionError:
+ log.info(
+ "ubuntu-distro-info (from the distro-info package) must be"
+ " installed to guess Ubuntu os/release"
+ )
+ return out.splitlines()
+
+
+class ImageSpecification:
+ """A specification of an image to launch for testing.
+
+ If either of ``os`` and ``release`` are not specified, an attempt will be
+ made to infer the correct values for these on instantiation.
+
+ :param image_id:
+ The image identifier used by the rest of the codebase to launch this
+ image.
+ :param os:
+ An optional string describing the operating system this image is for
+ (e.g. "ubuntu", "rhel", "freebsd").
+ :param release:
+ A optional string describing the operating system release (e.g.
+ "focal", "8"; the exact values here will depend on the OS).
+ """
+
+ def __init__(
+ self,
+ image_id: str,
+ os: "Optional[str]" = None,
+ release: "Optional[str]" = None,
+ ):
+ if image_id in _get_ubuntu_series():
+ if os is None:
+ os = "ubuntu"
+ if release is None:
+ release = image_id
+
+ self.image_id = image_id
+ self.os = os
+ self.release = release
+ log.info(
+ "Detected image: image_id=%s os=%s release=%s",
+ self.image_id,
+ self.os,
+ self.release,
+ )
+
+ @classmethod
+ def from_os_image(cls):
+ """Return an ImageSpecification for integration_settings.OS_IMAGE."""
+ parts = integration_settings.OS_IMAGE.split("::", 2)
+ return cls(*parts)
+
+
class IntegrationCloud(ABC):
datasource = None # type: Optional[str]
integration_instance_cls = IntegrationInstance
@@ -32,7 +91,23 @@ class IntegrationCloud(ABC):
def __init__(self, settings=integration_settings):
self.settings = settings
self.cloud_instance = self._get_cloud_instance()
- self.image_id = self._get_initial_image()
+ if settings.PUBLIC_SSH_KEY is not None:
+ # If we have a non-default key, use it.
+ self.cloud_instance.use_key(
+ settings.PUBLIC_SSH_KEY, name=settings.KEYPAIR_NAME
+ )
+ elif settings.KEYPAIR_NAME is not None:
+ # Even if we're using the default key, it may still have a
+ # different name in the clouds, so we need to set it separately.
+ self.cloud_instance.key_pair.name = settings.KEYPAIR_NAME
+ self._released_image_id = self._get_initial_image()
+ self.snapshot_id = None
+
+ @property
+ def image_id(self):
+ if self.snapshot_id:
+ return self.snapshot_id
+ return self._released_image_id
def emit_settings_to_log(self) -> None:
log.info(
@@ -50,21 +125,20 @@ class IntegrationCloud(ABC):
raise NotImplementedError
def _get_initial_image(self):
- image_id = self.settings.OS_IMAGE
+ image = ImageSpecification.from_os_image()
try:
- image_id = self.cloud_instance.released_image(
- self.settings.OS_IMAGE)
+ return self.cloud_instance.released_image(image.image_id)
except (ValueError, IndexError):
- pass
- return image_id
+ return image.image_id
def _perform_launch(self, launch_kwargs):
pycloudlib_instance = self.cloud_instance.launch(**launch_kwargs)
- pycloudlib_instance.wait(raise_on_cloudinit_failure=False)
return pycloudlib_instance
def launch(self, user_data=None, launch_kwargs=None,
settings=integration_settings):
+ if launch_kwargs is None:
+ launch_kwargs = {}
if self.settings.EXISTING_INSTANCE_ID:
log.info(
'Not launching instance due to EXISTING_INSTANCE_ID. '
@@ -76,10 +150,8 @@ class IntegrationCloud(ABC):
kwargs = {
'image_id': self.image_id,
'user_data': user_data,
- 'wait': False,
}
- if launch_kwargs:
- kwargs.update(launch_kwargs)
+ kwargs.update(launch_kwargs)
log.info(
"Launching instance with launch_kwargs:\n{}".format(
"\n".join("{}={}".format(*item) for item in kwargs.items())
@@ -87,9 +159,18 @@ class IntegrationCloud(ABC):
)
pycloudlib_instance = self._perform_launch(kwargs)
-
log.info('Launched instance: %s', pycloudlib_instance)
- return self.get_instance(pycloudlib_instance, settings)
+ instance = self.get_instance(pycloudlib_instance, settings)
+ if kwargs.get('wait', True):
+ # If we aren't waiting, we can't rely on command execution here
+ log.info(
+ 'cloud-init version: %s',
+ instance.execute("cloud-init --version")
+ )
+ serial = instance.execute("grep serial /etc/cloud/build.info")
+ if serial:
+ log.info('image serial: %s', serial.split()[1])
+ return instance
def get_instance(self, cloud_instance, settings=integration_settings):
return self.integration_instance_cls(self, cloud_instance, settings)
@@ -100,6 +181,19 @@ class IntegrationCloud(ABC):
def snapshot(self, instance):
return self.cloud_instance.snapshot(instance, clean=True)
+ def delete_snapshot(self):
+ if self.snapshot_id:
+ if self.settings.KEEP_IMAGE:
+ log.info(
+ 'NOT deleting snapshot image created for this testrun '
+ 'because KEEP_IMAGE is True: %s', self.snapshot_id)
+ else:
+ log.info(
+ 'Deleting snapshot image created for this testrun: %s',
+ self.snapshot_id
+ )
+ self.cloud_instance.delete_image(self.snapshot_id)
+
class Ec2Cloud(IntegrationCloud):
datasource = 'ec2'
@@ -116,9 +210,6 @@ class GceCloud(IntegrationCloud):
def _get_cloud_instance(self):
return GCE(
tag='gce-integration-test',
- project=self.settings.GCE_PROJECT,
- region=self.settings.GCE_REGION,
- zone=self.settings.GCE_ZONE,
)
@@ -130,7 +221,14 @@ class AzureCloud(IntegrationCloud):
return Azure(tag='azure-integration-test')
def destroy(self):
- self.cloud_instance.delete_resource_group()
+ if self.settings.KEEP_INSTANCE:
+ log.info(
+ 'NOT deleting resource group because KEEP_INSTANCE is true '
+ 'and deleting resource group would also delete instance. '
+ 'Instance and resource group must both be manually deleted.'
+ )
+ else:
+ self.cloud_instance.delete_resource_group()
class OciCloud(IntegrationCloud):
@@ -139,8 +237,7 @@ class OciCloud(IntegrationCloud):
def _get_cloud_instance(self):
return OCI(
- tag='oci-integration-test',
- compartment_id=self.settings.OCI_COMPARTMENT_ID
+ tag='oci-integration-test'
)
@@ -174,7 +271,7 @@ class _LxdIntegrationCloud(IntegrationCloud):
def _perform_launch(self, launch_kwargs):
launch_kwargs['inst_type'] = launch_kwargs.pop('instance_type', None)
- launch_kwargs.pop('wait')
+ wait = launch_kwargs.pop('wait', True)
release = launch_kwargs.pop('image_id')
try:
@@ -190,8 +287,7 @@ class _LxdIntegrationCloud(IntegrationCloud):
)
if self.settings.CLOUD_INIT_SOURCE == 'IN_PLACE':
self._mount_source(pycloudlib_instance)
- pycloudlib_instance.start(wait=False)
- pycloudlib_instance.wait(raise_on_cloudinit_failure=False)
+ pycloudlib_instance.start(wait=wait)
return pycloudlib_instance
diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py
index 73b44bfc..99dd8d9e 100644
--- a/tests/integration_tests/conftest.py
+++ b/tests/integration_tests/conftest.py
@@ -1,18 +1,29 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import datetime
+import functools
import logging
-import os
import pytest
+import os
import sys
+from tarfile import TarFile
from contextlib import contextmanager
+from pathlib import Path
from tests.integration_tests import integration_settings
from tests.integration_tests.clouds import (
+ AzureCloud,
Ec2Cloud,
GceCloud,
- AzureCloud,
- OciCloud,
+ ImageSpecification,
+ IntegrationCloud,
LxdContainerCloud,
LxdVmCloud,
+ OciCloud,
+ _LxdIntegrationCloud,
+)
+from tests.integration_tests.instances import (
+ CloudInitSource,
+ IntegrationInstance,
)
@@ -28,6 +39,9 @@ platforms = {
'lxd_container': LxdContainerCloud,
'lxd_vm': LxdVmCloud,
}
+os_list = ["ubuntu"]
+
+session_start_time = datetime.datetime.now().strftime('%y%m%d%H%M%S')
def pytest_runtest_setup(item):
@@ -54,6 +68,18 @@ def pytest_runtest_setup(item):
if supported_platforms and current_platform not in supported_platforms:
pytest.skip(unsupported_message)
+ image = ImageSpecification.from_os_image()
+ current_os = image.os
+ supported_os_set = set(os_list).intersection(test_marks)
+ if current_os and supported_os_set and current_os not in supported_os_set:
+ pytest.skip("Cannot run on OS {}".format(current_os))
+ if 'unstable' in test_marks and not integration_settings.RUN_UNSTABLE:
+ pytest.skip('Test marked unstable. Manually remove mark to run it')
+
+ current_release = image.release
+ if "not_{}".format(current_release) in test_marks:
+ pytest.skip("Cannot run on release {}".format(current_release))
+
# disable_subp_usage is defined at a higher level, but we don't
# want it applied here
@@ -75,82 +101,137 @@ def session_cloud():
cloud = platforms[integration_settings.PLATFORM]()
cloud.emit_settings_to_log()
yield cloud
- cloud.destroy()
+ try:
+ cloud.delete_snapshot()
+ finally:
+ cloud.destroy()
+
+def get_validated_source(
+ session_cloud: IntegrationCloud,
+ source=integration_settings.CLOUD_INIT_SOURCE
+) -> CloudInitSource:
+ if source == 'NONE':
+ return CloudInitSource.NONE
+ elif source == 'IN_PLACE':
+ if session_cloud.datasource not in ['lxd_container', 'lxd_vm']:
+ raise ValueError(
+ 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD')
+ return CloudInitSource.IN_PLACE
+ elif source == 'PROPOSED':
+ return CloudInitSource.PROPOSED
+ elif source.startswith('ppa:'):
+ return CloudInitSource.PPA
+ elif os.path.isfile(str(source)):
+ return CloudInitSource.DEB_PACKAGE
+ raise ValueError(
+ 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(source))
-@pytest.fixture(scope='session', autouse=True)
-def setup_image(session_cloud):
+
+@pytest.fixture(scope='session')
+def setup_image(session_cloud: IntegrationCloud):
"""Setup the target environment with the correct version of cloud-init.
So we can launch instances / run tests with the correct image
"""
- client = None
+
+ source = get_validated_source(session_cloud)
+ if not source.installs_new_version():
+ return
log.info('Setting up environment for %s', session_cloud.datasource)
- if integration_settings.CLOUD_INIT_SOURCE == 'NONE':
- pass # that was easy
- elif integration_settings.CLOUD_INIT_SOURCE == 'IN_PLACE':
- if session_cloud.datasource not in ['lxd_container', 'lxd_vm']:
- raise ValueError(
- 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD')
- # The mount needs to happen after the instance is created, so
- # no further action needed here
- elif integration_settings.CLOUD_INIT_SOURCE == 'PROPOSED':
- client = session_cloud.launch()
- client.install_proposed_image()
- elif integration_settings.CLOUD_INIT_SOURCE.startswith('ppa:'):
- client = session_cloud.launch()
- client.install_ppa(integration_settings.CLOUD_INIT_SOURCE)
- elif os.path.isfile(str(integration_settings.CLOUD_INIT_SOURCE)):
- client = session_cloud.launch()
- client.install_deb()
- else:
- raise ValueError(
- 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(
- integration_settings.CLOUD_INIT_SOURCE))
- if client:
- # Even if we're keeping instances, we don't want to keep this
- # one around as it was just for image creation
- client.destroy()
+ client = session_cloud.launch()
+ client.install_new_cloud_init(source)
+ # Even if we're keeping instances, we don't want to keep this
+ # one around as it was just for image creation
+ client.destroy()
log.info('Done with environment setup')
+def _collect_logs(instance: IntegrationInstance, node_id: str,
+ test_failed: bool):
+ """Collect logs from remote instance.
+
+ Args:
+ instance: The current IntegrationInstance to collect logs from
+ node_id: The pytest representation of this test, E.g.:
+ tests/integration_tests/test_example.py::TestExample.test_example
+ test_failed: If test failed or not
+ """
+ if any([
+ integration_settings.COLLECT_LOGS == 'NEVER',
+ integration_settings.COLLECT_LOGS == 'ON_ERROR' and not test_failed
+ ]):
+ return
+ instance.execute(
+ 'cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz')
+ node_id_path = Path(
+ node_id
+ .replace('.py', '') # Having a directory with '.py' would be weird
+ .replace('::', os.path.sep) # Turn classes/tests into paths
+ .replace('[', '-') # For parametrized names
+ .replace(']', '') # For parameterized names
+ )
+ log_dir = Path(
+ integration_settings.LOCAL_LOG_PATH
+ ) / session_start_time / node_id_path
+ log.info("Writing logs to %s", log_dir)
+ if not log_dir.exists():
+ log_dir.mkdir(parents=True)
+ tarball_path = log_dir / 'cloud-init.tar.gz'
+ instance.pull_file('/var/tmp/cloud-init.tar.gz', tarball_path)
+
+ tarball = TarFile.open(str(tarball_path))
+ tarball.extractall(path=str(log_dir))
+ tarball_path.unlink()
+
+
@contextmanager
-def _client(request, fixture_utils, session_cloud):
+def _client(request, fixture_utils, session_cloud: IntegrationCloud):
"""Fixture implementation for the client fixtures.
Launch the dynamic IntegrationClient instance using any provided
userdata, yield to the test, then cleanup
"""
- user_data = fixture_utils.closest_marker_first_arg_or(
- request, 'user_data', None)
- name = fixture_utils.closest_marker_first_arg_or(
- request, 'instance_name', None
+ getter = functools.partial(
+ fixture_utils.closest_marker_first_arg_or, request, default=None
)
+ user_data = getter('user_data')
+ name = getter('instance_name')
+ lxd_config_dict = getter('lxd_config_dict')
+
launch_kwargs = {}
if name is not None:
- launch_kwargs = {"name": name}
+ launch_kwargs["name"] = name
+ if lxd_config_dict is not None:
+ if not isinstance(session_cloud, _LxdIntegrationCloud):
+ pytest.skip("lxd_config_dict requires LXD")
+ launch_kwargs["config_dict"] = lxd_config_dict
+
with session_cloud.launch(
user_data=user_data, launch_kwargs=launch_kwargs
) as instance:
+ previous_failures = request.session.testsfailed
yield instance
+ test_failed = request.session.testsfailed - previous_failures > 0
+ _collect_logs(instance, request.node.nodeid, test_failed)
@pytest.yield_fixture
-def client(request, fixture_utils, session_cloud):
+def client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs for every test."""
with _client(request, fixture_utils, session_cloud) as client:
yield client
@pytest.yield_fixture(scope='module')
-def module_client(request, fixture_utils, session_cloud):
+def module_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per module."""
with _client(request, fixture_utils, session_cloud) as client:
yield client
@pytest.yield_fixture(scope='class')
-def class_client(request, fixture_utils, session_cloud):
+def class_client(request, fixture_utils, session_cloud, setup_image):
"""Provide a client that runs once per class."""
with _client(request, fixture_utils, session_cloud) as client:
yield client
@@ -180,3 +261,20 @@ def pytest_assertrepr_compare(op, left, right):
'"{}" not in cloud-init.log string; unexpectedly found on'
" these lines:".format(left)
] + found_lines
+
+
+def pytest_configure(config):
+ """Perform initial configuration, before the test runs start.
+
+ This hook is only called if integration tests are being executed, so we can
+ use it to configure defaults for integration testing that differ from the
+ rest of the tests in the codebase.
+
+ See
+ https://docs.pytest.org/en/latest/reference.html#_pytest.hookspec.pytest_configure
+ for pytest's documentation.
+ """
+ if "log_cli_level" in config.option and not config.option.log_cli_level:
+ # If log_cli_level is available in this version of pytest and not set
+ # to anything, set it to INFO.
+ config.option.log_cli_level = "INFO"
diff --git a/tests/integration_tests/instances.py b/tests/integration_tests/instances.py
index 9b13288c..0d1e1aef 100644
--- a/tests/integration_tests/instances.py
+++ b/tests/integration_tests/instances.py
@@ -1,4 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
+from enum import Enum
import logging
import os
import uuid
@@ -25,9 +26,27 @@ def _get_tmp_path():
return '/var/tmp/{}.tmp'.format(tmp_filename)
-class IntegrationInstance:
- use_sudo = True
+class CloudInitSource(Enum):
+ """Represents the cloud-init image source setting as a defined value.
+
+ Values here represent all possible values for CLOUD_INIT_SOURCE in
+ tests/integration_tests/integration_settings.py. See that file for an
+ explanation of these values. If the value set there can't be parsed into
+ one of these values, an exception will be raised
+ """
+ NONE = 1
+ IN_PLACE = 2
+ PROPOSED = 3
+ PPA = 4
+ DEB_PACKAGE = 5
+
+ def installs_new_version(self):
+ if self.name in [self.NONE.name, self.IN_PLACE.name]:
+ return False
+ return True
+
+class IntegrationInstance:
def __init__(self, cloud: 'IntegrationCloud', instance: BaseInstance,
settings=integration_settings):
self.cloud = cloud
@@ -37,24 +56,30 @@ class IntegrationInstance:
def destroy(self):
self.instance.delete()
- def execute(self, command, *, use_sudo=None) -> Result:
+ def restart(self):
+ """Restart this instance (via cloud mechanism) and wait for boot.
+
+ This wraps pycloudlib's `BaseInstance.restart`
+ """
+ log.info("Restarting instance and waiting for boot")
+ self.instance.restart()
+
+ def execute(self, command, *, use_sudo=True) -> Result:
if self.instance.username == 'root' and use_sudo is False:
raise Exception('Root user cannot run unprivileged')
- if use_sudo is None:
- use_sudo = self.use_sudo
return self.instance.execute(command, use_sudo=use_sudo)
def pull_file(self, remote_path, local_path):
# First copy to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
- self.instance.execute('cp {} {}'.format(remote_path, tmp_path))
- self.instance.pull_file(tmp_path, local_path)
+ self.instance.execute('cp {} {}'.format(str(remote_path), tmp_path))
+ self.instance.pull_file(tmp_path, str(local_path))
def push_file(self, local_path, remote_path):
# First push to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
- self.instance.push_file(local_path, tmp_path)
- self.execute('mv {} {}'.format(tmp_path, remote_path))
+ self.instance.push_file(str(local_path), tmp_path)
+ self.execute('mv {} {}'.format(tmp_path, str(remote_path)))
def read_from_file(self, remote_path) -> str:
result = self.execute('cat {}'.format(remote_path))
@@ -83,36 +108,52 @@ class IntegrationInstance:
os.unlink(tmp_file.name)
def snapshot(self):
- return self.cloud.snapshot(self.instance)
-
- def _install_new_cloud_init(self, remote_script):
- self.execute(remote_script)
+ image_id = self.cloud.snapshot(self.instance)
+ log.info('Created new image: %s', image_id)
+ return image_id
+
+ def install_new_cloud_init(
+ self,
+ source: CloudInitSource,
+ take_snapshot=True
+ ):
+ if source == CloudInitSource.DEB_PACKAGE:
+ self.install_deb()
+ elif source == CloudInitSource.PPA:
+ self.install_ppa()
+ elif source == CloudInitSource.PROPOSED:
+ self.install_proposed_image()
+ else:
+ raise Exception(
+ "Specified to install {} which isn't supported here".format(
+ source)
+ )
version = self.execute('cloud-init -v').split()[-1]
log.info('Installed cloud-init version: %s', version)
self.instance.clean()
- image_id = self.snapshot()
- log.info('Created new image: %s', image_id)
- self.cloud.image_id = image_id
+ if take_snapshot:
+ snapshot_id = self.snapshot()
+ self.cloud.snapshot_id = snapshot_id
def install_proposed_image(self):
log.info('Installing proposed image')
remote_script = (
- '{sudo} echo deb "http://archive.ubuntu.com/ubuntu '
+ 'echo deb "http://archive.ubuntu.com/ubuntu '
'$(lsb_release -sc)-proposed main" | '
- '{sudo} tee /etc/apt/sources.list.d/proposed.list\n'
- '{sudo} apt-get update -q\n'
- '{sudo} apt-get install -qy cloud-init'
- ).format(sudo='sudo' if self.use_sudo else '')
- self._install_new_cloud_init(remote_script)
+ 'tee /etc/apt/sources.list.d/proposed.list\n'
+ 'apt-get update -q\n'
+ 'apt-get install -qy cloud-init'
+ )
+ self.execute(remote_script)
- def install_ppa(self, repo):
+ def install_ppa(self):
log.info('Installing PPA')
remote_script = (
- '{sudo} add-apt-repository {repo} -y && '
- '{sudo} apt-get update -q && '
- '{sudo} apt-get install -qy cloud-init'
- ).format(sudo='sudo' if self.use_sudo else '', repo=repo)
- self._install_new_cloud_init(remote_script)
+ 'add-apt-repository {repo} -y && '
+ 'apt-get update -q && '
+ 'apt-get install -qy cloud-init'
+ ).format(repo=self.settings.CLOUD_INIT_SOURCE)
+ self.execute(remote_script)
def install_deb(self):
log.info('Installing deb package')
@@ -122,9 +163,8 @@ class IntegrationInstance:
self.push_file(
local_path=integration_settings.CLOUD_INIT_SOURCE,
remote_path=remote_path)
- remote_script = '{sudo} dpkg -i {path}'.format(
- sudo='sudo' if self.use_sudo else '', path=remote_path)
- self._install_new_cloud_init(remote_script)
+ remote_script = 'dpkg -i {path}'.format(path=remote_path)
+ self.execute(remote_script)
def __enter__(self):
return self
@@ -151,4 +191,4 @@ class IntegrationOciInstance(IntegrationInstance):
class IntegrationLxdInstance(IntegrationInstance):
- use_sudo = False
+ pass
diff --git a/tests/integration_tests/integration_settings.py b/tests/integration_tests/integration_settings.py
index a0609f7e..22b4fdda 100644
--- a/tests/integration_tests/integration_settings.py
+++ b/tests/integration_tests/integration_settings.py
@@ -1,15 +1,22 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+from distutils.util import strtobool
+
##################################################################
# LAUNCH SETTINGS
##################################################################
# Keep instance (mostly for debugging) when test is finished
KEEP_INSTANCE = False
+# Keep snapshot image (mostly for debugging) when test is finished
+KEEP_IMAGE = False
+# Run tests marked as unstable. Expect failures and dragons.
+RUN_UNSTABLE = False
# One of:
# lxd_container
+# lxd_vm
# azure
# ec2
# gce
@@ -21,8 +28,11 @@ PLATFORM = 'lxd_container'
INSTANCE_TYPE = None
# Determines the base image to use or generate new images from.
-# Can be the name of the OS if running a stock image,
-# otherwise the id of the image being used if using a custom image
+#
+# This can be the name of an Ubuntu release, or in the format
+# <image_id>[::<os>[::<release>]]. If given, os and release should describe
+# the image specified by image_id. (Ubuntu releases are converted to this
+# format internally; in this case, to "focal::ubuntu::focal".)
OS_IMAGE = 'focal'
# Populate if you want to use a pre-launched instance instead of
@@ -55,23 +65,27 @@ EXISTING_INSTANCE_ID = None
# A path to a valid package to be uploaded and installed
CLOUD_INIT_SOURCE = 'NONE'
+# Before an instance is torn down, we run `cloud-init collect-logs`
+# and transfer them locally. These settings specify when to collect these
+# logs and where to put them on the local filesystem
+# One of:
+# 'ALWAYS'
+# 'ON_ERROR'
+# 'NEVER'
+COLLECT_LOGS = 'ON_ERROR'
+LOCAL_LOG_PATH = '/tmp/cloud_init_test_logs'
+
##################################################################
-# GCE SPECIFIC SETTINGS
+# SSH KEY SETTINGS
##################################################################
-# Required for GCE
-GCE_PROJECT = None
-# You probably want to override these
-GCE_REGION = 'us-central1'
-GCE_ZONE = 'a'
+# A path to the public SSH key to use for test runs. (Defaults to pycloudlib's
+# default behaviour, using ~/.ssh/id_rsa.pub.)
+PUBLIC_SSH_KEY = None
-##################################################################
-# OCI SPECIFIC SETTINGS
-##################################################################
-# Compartment-id found at
-# https://console.us-phoenix-1.oraclecloud.com/a/identity/compartments
-# Required for Oracle
-OCI_COMPARTMENT_ID = None
+# For clouds which use named keypairs for SSH connection, the name that is used
+# for the keypair. (Defaults to pycloudlib's default behaviour.)
+KEYPAIR_NAME = None
##################################################################
# USER SETTINGS OVERRIDES
@@ -91,6 +105,12 @@ except ImportError:
# Perhaps a bit too hacky, but it works :)
current_settings = [var for var in locals() if var.isupper()]
for setting in current_settings:
- globals()[setting] = os.getenv(
+ env_setting = os.getenv(
'CLOUD_INIT_{}'.format(setting), globals()[setting]
)
+ if isinstance(env_setting, str):
+ try:
+ env_setting = bool(strtobool(env_setting.strip()))
+ except ValueError:
+ pass
+ globals()[setting] = env_setting
diff --git a/tests/integration_tests/log_utils.py b/tests/integration_tests/log_utils.py
new file mode 100644
index 00000000..40baae7b
--- /dev/null
+++ b/tests/integration_tests/log_utils.py
@@ -0,0 +1,11 @@
+def verify_ordered_items_in_text(to_verify: list, text: str):
+ """Assert all items in list appear in order in text.
+
+ Examples:
+ verify_ordered_items_in_text(['a', '1'], 'ab1') # passes
+ verify_ordered_items_in_text(['1', 'a'], 'ab1') # raises AssertionError
+ """
+ index = 0
+ for item in to_verify:
+ index = text[index:].find(item)
+ assert index > -1, "Expected item not found: '{}'".format(item)
diff --git a/tests/integration_tests/modules/test_apt_configure_sources_list.py b/tests/integration_tests/modules/test_apt_configure_sources_list.py
index d2bcc61a..28cbe19f 100644
--- a/tests/integration_tests/modules/test_apt_configure_sources_list.py
+++ b/tests/integration_tests/modules/test_apt_configure_sources_list.py
@@ -40,6 +40,7 @@ EXPECTED_REGEXES = [
@pytest.mark.ci
+@pytest.mark.ubuntu
class TestAptConfigureSourcesList:
@pytest.mark.user_data(USER_DATA)
diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py
new file mode 100644
index 00000000..89c01a9c
--- /dev/null
+++ b/tests/integration_tests/modules/test_ca_certs.py
@@ -0,0 +1,91 @@
+"""Integration tests for cc_ca_certs.
+
+(This is ported from ``tests/cloud_tests//testcases/modules/ca_certs.yaml``.)
+
+TODO:
+* Mark this as running on Debian and Alpine (once we have marks for that)
+* Implement testing for the RHEL-specific paths
+"""
+import os.path
+
+import pytest
+
+
+USER_DATA = """\
+#cloud-config
+ca-certs:
+ remove-defaults: true
+ trusted:
+ - |
+ -----BEGIN CERTIFICATE-----
+ MIIGJzCCBA+gAwIBAgIBATANBgkqhkiG9w0BAQUFADCBsjELMAkGA1UEBhMCRlIx
+ DzANBgNVBAgMBkFsc2FjZTETMBEGA1UEBwwKU3RyYXNib3VyZzEYMBYGA1UECgwP
+ d3d3LmZyZWVsYW4ub3JnMRAwDgYDVQQLDAdmcmVlbGFuMS0wKwYDVQQDDCRGcmVl
+ bGFuIFNhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkxIjAgBgkqhkiG9w0BCQEW
+ E2NvbnRhY3RAZnJlZWxhbi5vcmcwHhcNMTIwNDI3MTAzMTE4WhcNMjIwNDI1MTAz
+ MTE4WjB+MQswCQYDVQQGEwJGUjEPMA0GA1UECAwGQWxzYWNlMRgwFgYDVQQKDA93
+ d3cuZnJlZWxhbi5vcmcxEDAOBgNVBAsMB2ZyZWVsYW4xDjAMBgNVBAMMBWFsaWNl
+ MSIwIAYJKoZIhvcNAQkBFhNjb250YWN0QGZyZWVsYW4ub3JnMIICIjANBgkqhkiG
+ 9w0BAQEFAAOCAg8AMIICCgKCAgEA3W29+ID6194bH6ejLrIC4hb2Ugo8v6ZC+Mrc
+ k2dNYMNPjcOKABvxxEtBamnSaeU/IY7FC/giN622LEtV/3oDcrua0+yWuVafyxmZ
+ yTKUb4/GUgafRQPf/eiX9urWurtIK7XgNGFNUjYPq4dSJQPPhwCHE/LKAykWnZBX
+ RrX0Dq4XyApNku0IpjIjEXH+8ixE12wH8wt7DEvdO7T3N3CfUbaITl1qBX+Nm2Z6
+ q4Ag/u5rl8NJfXg71ZmXA3XOj7zFvpyapRIZcPmkvZYn7SMCp8dXyXHPdpSiIWL2
+ uB3KiO4JrUYvt2GzLBUThp+lNSZaZ/Q3yOaAAUkOx+1h08285Pi+P8lO+H2Xic4S
+ vMq1xtLg2bNoPC5KnbRfuFPuUD2/3dSiiragJ6uYDLOyWJDivKGt/72OVTEPAL9o
+ 6T2pGZrwbQuiFGrGTMZOvWMSpQtNl+tCCXlT4mWqJDRwuMGrI4DnnGzt3IKqNwS4
+ Qyo9KqjMIPwnXZAmWPm3FOKe4sFwc5fpawKO01JZewDsYTDxVj+cwXwFxbE2yBiF
+ z2FAHwfopwaH35p3C6lkcgP2k/zgAlnBluzACUI+MKJ/G0gv/uAhj1OHJQ3L6kn1
+ SpvQ41/ueBjlunExqQSYD7GtZ1Kg8uOcq2r+WISE3Qc9MpQFFkUVllmgWGwYDuN3
+ Zsez95kCAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0EHxYdT3BlblNT
+ TCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYEFFlfyRO6G8y5qEFKikl5
+ ajb2fT7XMB8GA1UdIwQYMBaAFCNsLT0+KV14uGw+quK7Lh5sh/JTMA0GCSqGSIb3
+ DQEBBQUAA4ICAQAT5wJFPqervbja5+90iKxi1d0QVtVGB+z6aoAMuWK+qgi0vgvr
+ mu9ot2lvTSCSnRhjeiP0SIdqFMORmBtOCFk/kYDp9M/91b+vS+S9eAlxrNCB5VOf
+ PqxEPp/wv1rBcE4GBO/c6HcFon3F+oBYCsUQbZDKSSZxhDm3mj7pb67FNbZbJIzJ
+ 70HDsRe2O04oiTx+h6g6pW3cOQMgIAvFgKN5Ex727K4230B0NIdGkzuj4KSML0NM
+ slSAcXZ41OoSKNjy44BVEZv0ZdxTDrRM4EwJtNyggFzmtTuV02nkUj1bYYYC5f0L
+ ADr6s0XMyaNk8twlWYlYDZ5uKDpVRVBfiGcq0uJIzIvemhuTrofh8pBQQNkPRDFT
+ Rq1iTo1Ihhl3/Fl1kXk1WR3jTjNb4jHX7lIoXwpwp767HAPKGhjQ9cFbnHMEtkro
+ RlJYdtRq5mccDtwT0GFyoJLLBZdHHMHJz0F9H7FNk2tTQQMhK5MVYwg+LIaee586
+ CQVqfbscp7evlgjLW98H+5zylRHAgoH2G79aHljNKMp9BOuq6SnEglEsiWGVtu2l
+ hnx8SB3sVJZHeer8f/UQQwqbAO+Kdy70NmbSaqaVtp8jOxLiidWkwSyRTsuU6D8i
+ DiH5uEqBXExjrj0FslxcVKdVj5glVcSmkLwZKbEU1OKwleT/iXFhvooWhQ==
+ -----END CERTIFICATE-----
+"""
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(USER_DATA)
+class TestCaCerts:
+ def test_certs_updated(self, class_client):
+ """Test that /etc/ssl/certs is updated as we expect."""
+ root = "/etc/ssl/certs"
+ filenames = class_client.execute(["ls", "-1", root]).splitlines()
+ unlinked_files = []
+ links = {}
+ for filename in filenames:
+ full_path = os.path.join(root, filename)
+ symlink_target = class_client.execute(["readlink", full_path])
+ is_symlink = symlink_target.ok
+ if is_symlink:
+ links[filename] = symlink_target
+ else:
+ unlinked_files.append(filename)
+
+ assert ["ca-certificates.crt"] == unlinked_files
+ assert "cloud-init-ca-certs.pem" == links["a535c1f3.0"]
+ assert (
+ "/usr/share/ca-certificates/cloud-init-ca-certs.crt"
+ == links["cloud-init-ca-certs.pem"]
+ )
+
+ def test_cert_installed(self, class_client):
+ """Test that our specified cert has been installed"""
+ checksum = class_client.execute(
+ "sha256sum /etc/ssl/certs/ca-certificates.crt"
+ )
+ assert (
+ "78e875f18c73c1aab9167ae0bd323391e52222cc2dbcda42d129537219300062"
+ in checksum
+ )
diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py
new file mode 100644
index 00000000..3f41b34d
--- /dev/null
+++ b/tests/integration_tests/modules/test_cli.py
@@ -0,0 +1,45 @@
+"""Integration tests for CLI functionality
+
+These would be for behavior manually invoked by user from the command line
+"""
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+
+
+VALID_USER_DATA = """\
+#cloud-config
+runcmd:
+ - echo 'hi' > /var/tmp/test
+"""
+
+INVALID_USER_DATA = """\
+runcmd:
+ - echo 'hi' > /var/tmp/test
+"""
+
+
+@pytest.mark.sru_2020_11
+@pytest.mark.user_data(VALID_USER_DATA)
+def test_valid_userdata(client: IntegrationInstance):
+ """Test `cloud-init devel schema` with valid userdata.
+
+ PR #575
+ """
+ result = client.execute('cloud-init devel schema --system')
+ assert result.ok
+ assert 'Valid cloud-config: system userdata' == result.stdout.strip()
+
+
+@pytest.mark.sru_2020_11
+@pytest.mark.user_data(INVALID_USER_DATA)
+def test_invalid_userdata(client: IntegrationInstance):
+ """Test `cloud-init devel schema` with invalid userdata.
+
+ PR #575
+ """
+ result = client.execute('cloud-init devel schema --system')
+ assert not result.ok
+ assert 'Cloud config schema errors' in result.stderr
+ assert 'needs to begin with "#cloud-config"' in result.stderr
diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py
new file mode 100644
index 00000000..cbf11179
--- /dev/null
+++ b/tests/integration_tests/modules/test_lxd_bridge.py
@@ -0,0 +1,48 @@
+"""Integration tests for LXD bridge creation.
+
+(This is ported from
+``tests/cloud_tests/testcases/modules/lxd_bridge.yaml``.)
+"""
+import pytest
+import yaml
+
+
+USER_DATA = """\
+#cloud-config
+lxd:
+ init:
+ storage_backend: dir
+ bridge:
+ mode: new
+ name: lxdbr0
+ ipv4_address: 10.100.100.1
+ ipv4_netmask: 24
+ ipv4_dhcp_first: 10.100.100.100
+ ipv4_dhcp_last: 10.100.100.200
+ ipv4_nat: true
+ domain: lxd
+"""
+
+
+@pytest.mark.no_container
+@pytest.mark.user_data(USER_DATA)
+class TestLxdBridge:
+
+ @pytest.mark.parametrize("binary_name", ["lxc", "lxd"])
+ def test_binaries_installed(self, class_client, binary_name):
+ """Check that the expected LXD binaries are installed"""
+ assert class_client.execute(["which", binary_name]).ok
+
+ @pytest.mark.not_xenial
+ @pytest.mark.sru_2020_11
+ def test_bridge(self, class_client):
+ """Check that the given bridge is configured"""
+ cloud_init_log = class_client.read_from_file("/var/log/cloud-init.log")
+ assert "WARN" not in cloud_init_log
+
+ # The bridge should exist
+ assert class_client.execute("ip addr show lxdbr0")
+
+ raw_network_config = class_client.execute("lxc network show lxdbr0")
+ network_config = yaml.safe_load(raw_network_config)
+ assert "10.100.100.1/24" == network_config["config"]["ipv4.address"]
diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py
index 8a38ad84..28d741bc 100644
--- a/tests/integration_tests/modules/test_package_update_upgrade_install.py
+++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py
@@ -26,6 +26,7 @@ package_upgrade: true
"""
+@pytest.mark.ubuntu
@pytest.mark.user_data(USER_DATA)
class TestPackageUpdateUpgradeInstall:
diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py
new file mode 100644
index 00000000..32dfc86d
--- /dev/null
+++ b/tests/integration_tests/modules/test_power_state_change.py
@@ -0,0 +1,90 @@
+"""Integration test of the cc_power_state_change module.
+
+Test that the power state config options work as expected.
+"""
+
+import time
+
+import pytest
+
+from tests.integration_tests.clouds import IntegrationCloud
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.log_utils import verify_ordered_items_in_text
+
+USER_DATA = """\
+#cloud-config
+power_state:
+ delay: {delay}
+ mode: {mode}
+ message: msg
+ timeout: {timeout}
+ condition: {condition}
+"""
+
+
+def _detect_reboot(instance: IntegrationInstance):
+ # We'll wait for instance up here, but we don't know if we're
+ # detecting the first boot or second boot, so we also check
+ # the logs to ensure we've booted twice. If the logs show we've
+ # only booted once, wait until we've booted twice
+ instance.instance.wait()
+ for _ in range(600):
+ try:
+ log = instance.read_from_file('/var/log/cloud-init.log')
+ boot_count = log.count("running 'init-local'")
+ if boot_count == 1:
+ instance.instance.wait()
+ elif boot_count > 1:
+ break
+ except Exception:
+ pass
+ time.sleep(1)
+ else:
+ raise Exception('Could not detect reboot')
+
+
+def _can_connect(instance):
+ return instance.execute('true').ok
+
+
+# This test is marked unstable because even though it should be able to
+# run anywhere, I can only get it to run in an lxd container, and even then
+# occasionally some timing issues will crop up.
+@pytest.mark.unstable
+@pytest.mark.sru_2020_11
+@pytest.mark.ubuntu
+@pytest.mark.lxd_container
+class TestPowerChange:
+ @pytest.mark.parametrize('mode,delay,timeout,expected', [
+ ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'),
+ ('reboot', 'now', '0', 'will execute: shutdown -r now msg'),
+ ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'),
+ ])
+ def test_poweroff(self, session_cloud: IntegrationCloud,
+ mode, delay, timeout, expected):
+ with session_cloud.launch(
+ user_data=USER_DATA.format(
+ delay=delay, mode=mode, timeout=timeout, condition='true'),
+ wait=False
+ ) as instance:
+ if mode == 'reboot':
+ _detect_reboot(instance)
+ else:
+ instance.instance.wait_for_stop()
+ instance.instance.start(wait=True)
+ log = instance.read_from_file('/var/log/cloud-init.log')
+ assert _can_connect(instance)
+ lines_to_check = [
+ 'Running module power-state-change',
+ expected,
+ "running 'init-local'",
+ 'config-power-state-change already ran',
+ ]
+ verify_ordered_items_in_text(lines_to_check, log)
+
+ @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff',
+ timeout='0', condition='false'))
+ def test_poweroff_false_condition(self, client: IntegrationInstance):
+ log = client.read_from_file('/var/log/cloud-init.log')
+ assert _can_connect(client)
+ assert 'Condition was false. Will not perform state change' in log
diff --git a/tests/integration_tests/modules/test_seed_random_data.py b/tests/integration_tests/modules/test_seed_random_data.py
index b365fa98..f6a67c19 100644
--- a/tests/integration_tests/modules/test_seed_random_data.py
+++ b/tests/integration_tests/modules/test_seed_random_data.py
@@ -25,4 +25,4 @@ class TestSeedRandomData:
@pytest.mark.user_data(USER_DATA)
def test_seed_random_data(self, client):
seed_output = client.read_from_file("/root/seed")
- assert seed_output.strip() == "MYUb34023nD:LFDK10913jk;dfnk:Df"
+ assert seed_output.startswith("MYUb34023nD:LFDK10913jk;dfnk:Df")
diff --git a/tests/integration_tests/modules/test_snap.py b/tests/integration_tests/modules/test_snap.py
index b626f6b0..481edbaa 100644
--- a/tests/integration_tests/modules/test_snap.py
+++ b/tests/integration_tests/modules/test_snap.py
@@ -20,6 +20,7 @@ snap:
@pytest.mark.ci
+@pytest.mark.ubuntu
class TestSnap:
@pytest.mark.user_data(USER_DATA)
diff --git a/tests/integration_tests/modules/test_ssh_import_id.py b/tests/integration_tests/modules/test_ssh_import_id.py
index 45d37d6c..3db573b5 100644
--- a/tests/integration_tests/modules/test_ssh_import_id.py
+++ b/tests/integration_tests/modules/test_ssh_import_id.py
@@ -3,6 +3,10 @@
This test specifies ssh keys to be imported by the ``ssh_import_id`` module
and then checks that if the ssh keys were successfully imported.
+TODO:
+* This test assumes that SSH keys will be imported into the /home/ubuntu; this
+ will need modification to run on other OSes.
+
(This is ported from
``tests/cloud_tests/testcases/modules/ssh_import_id.yaml``.)"""
@@ -18,6 +22,7 @@ ssh_import_id:
@pytest.mark.ci
+@pytest.mark.ubuntu
class TestSshImportId:
@pytest.mark.user_data(USER_DATA)
diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py
index 27d193c1..6aae96ae 100644
--- a/tests/integration_tests/modules/test_ssh_keys_provided.py
+++ b/tests/integration_tests/modules/test_ssh_keys_provided.py
@@ -83,66 +83,78 @@ ssh_keys:
@pytest.mark.user_data(USER_DATA)
class TestSshKeysProvided:
- def test_ssh_dsa_keys_provided(self, class_client):
- """Test dsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key.pub")
- assert (
- "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
- "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM") in out
-
- """Test dsa private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key")
- assert (
- "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
- "hOVAfzZ6+jklP") in out
-
- def test_ssh_rsa_keys_provided(self, class_client):
- """Test rsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key.pub")
- assert (
- "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
- "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4") in out
-
- """Test rsa private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key")
- assert (
- "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
- "RQvLZpMRdywBm") in out
-
- def test_ssh_rsa_certificate_provided(self, class_client):
- """Test rsa certificate was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key-cert.pub")
- assert (
- "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
- "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD") in out
-
- def test_ssh_certificate_updated_sshd_config(self, class_client):
- """Test ssh certificate was added to /etc/ssh/sshd_config."""
- out = class_client.read_from_file("/etc/ssh/sshd_config").strip()
- assert "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub" in out
-
- def test_ssh_ecdsa_keys_provided(self, class_client):
- """Test ecdsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key.pub")
- assert (
- "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
- "BBFsS5Tvky/IC/dXhE/afxxU") in out
-
- """Test ecdsa private key generated."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key")
- assert (
- "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
- "5mpZqxgX4vcgb") in out
-
- def test_ssh_ed25519_keys_provided(self, class_client):
- """Test ed25519 public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key.pub")
- assert (
- "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
- "G15dqjQ2XkNVOEnb5") in out
-
- """Test ed25519 private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key")
- assert (
- "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
- "OhteXao0Nl5DVThJ2+Q") in out
+ @pytest.mark.parametrize(
+ "config_path,expected_out",
+ (
+ (
+ "/etc/ssh/ssh_host_dsa_key.pub",
+ (
+ "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
+ "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_dsa_key",
+ (
+ "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
+ "hOVAfzZ6+jklP"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key.pub",
+ (
+ "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
+ "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key",
+ (
+ "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
+ "RQvLZpMRdywBm"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key-cert.pub",
+ (
+ "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
+ "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD"
+ ),
+ ),
+ (
+ "/etc/ssh/sshd_config",
+ "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub",
+ ),
+ (
+ "/etc/ssh/ssh_host_ecdsa_key.pub",
+ (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
+ "BBFsS5Tvky/IC/dXhE/afxxU"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_ecdsa_key",
+ (
+ "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
+ "5mpZqxgX4vcgb"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_ed25519_key.pub",
+ (
+ "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
+ "G15dqjQ2XkNVOEnb5"
+ ),
+ ),
+ (
+ "/etc/ssh/ssh_host_ed25519_key",
+ (
+ "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
+ "OhteXao0Nl5DVThJ2+Q"
+ ),
+ ),
+ )
+ )
+ def test_ssh_provided_keys(self, config_path, expected_out, class_client):
+ out = class_client.read_from_file(config_path).strip()
+ assert expected_out in out
diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py
index 6a51f5a6..ee08d87b 100644
--- a/tests/integration_tests/modules/test_users_groups.py
+++ b/tests/integration_tests/modules/test_users_groups.py
@@ -2,6 +2,10 @@
This test specifies a number of users and groups via user-data, and confirms
that they have been configured correctly in the system under test.
+
+TODO:
+* This test assumes that the "ubuntu" user will be created when "default" is
+ specified; this will need modification to run on other OSes.
"""
import re
@@ -41,6 +45,7 @@ AHWYPYb2FT.lbioDm2RrkJPb9BZMN1O/
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestUsersGroups:
+ @pytest.mark.ubuntu
@pytest.mark.parametrize(
"getent_args,regex",
[
diff --git a/tests/integration_tests/test_upgrade.py b/tests/integration_tests/test_upgrade.py
new file mode 100644
index 00000000..233a574b
--- /dev/null
+++ b/tests/integration_tests/test_upgrade.py
@@ -0,0 +1,98 @@
+import logging
+import pytest
+import time
+from pathlib import Path
+
+from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud
+from tests.integration_tests.conftest import (
+ get_validated_source,
+ session_start_time,
+)
+
+log = logging.getLogger('integration_testing')
+
+USER_DATA = """\
+#cloud-config
+hostname: SRU-worked
+"""
+
+
+def _output_to_compare(instance, file_path, netcfg_path):
+ commands = [
+ 'hostname',
+ 'dpkg-query --show cloud-init',
+ 'cat /run/cloud-init/result.json',
+ # 'cloud-init init' helps us understand if our pickling upgrade paths
+ # have broken across re-constitution of a cached datasource. Some
+ # platforms invalidate their datasource cache on reboot, so we run
+ # it here to ensure we get a dirty run.
+ 'cloud-init init',
+ 'grep Trace /var/log/cloud-init.log',
+ 'cloud-id',
+ 'cat {}'.format(netcfg_path),
+ 'systemd-analyze',
+ 'systemd-analyze blame',
+ 'cloud-init analyze show',
+ 'cloud-init analyze blame',
+ ]
+ with file_path.open('w') as f:
+ for command in commands:
+ f.write('===== {} ====='.format(command) + '\n')
+ f.write(instance.execute(command) + '\n')
+
+
+def _restart(instance):
+ # work around pad.lv/1908287
+ instance.restart()
+ if not instance.execute('cloud-init status --wait --long').ok:
+ for _ in range(10):
+ time.sleep(5)
+ result = instance.execute('cloud-init status --wait --long')
+ if result.ok:
+ return
+ raise Exception("Cloud-init didn't finish starting up")
+
+
+@pytest.mark.sru_2020_11
+def test_upgrade(session_cloud: IntegrationCloud):
+ source = get_validated_source(session_cloud)
+ if not source.installs_new_version():
+ pytest.skip("Install method '{}' not supported for this test".format(
+ source
+ ))
+ return # type checking doesn't understand that skip raises
+
+ launch_kwargs = {
+ 'image_id': session_cloud._get_initial_image(),
+ }
+
+ image = ImageSpecification.from_os_image()
+
+ # Get the paths to write test logs
+ output_dir = Path(session_cloud.settings.LOCAL_LOG_PATH)
+ output_dir.mkdir(parents=True, exist_ok=True)
+ base_filename = 'test_upgrade_{platform}_{os}_{{stage}}_{time}.log'.format(
+ platform=session_cloud.settings.PLATFORM,
+ os=image.release,
+ time=session_start_time,
+ )
+ before_path = output_dir / base_filename.format(stage='before')
+ after_path = output_dir / base_filename.format(stage='after')
+
+ # Get the network cfg file
+ netcfg_path = '/dev/null'
+ if image.os == 'ubuntu':
+ netcfg_path = '/etc/netplan/50-cloud-init.yaml'
+ if image.release == 'xenial':
+ netcfg_path = '/etc/network/interfaces.d/50-cloud-init.cfg'
+
+ with session_cloud.launch(
+ launch_kwargs=launch_kwargs, user_data=USER_DATA, wait=True,
+ ) as instance:
+ _output_to_compare(instance, before_path, netcfg_path)
+ instance.install_new_cloud_init(source, take_snapshot=False)
+ instance.execute('hostname something-else')
+ _restart(instance)
+ _output_to_compare(instance, after_path, netcfg_path)
+
+ log.info('Wrote upgrade test logs to %s and %s', before_path, after_path)
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e363c1f9..dc615309 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -159,6 +159,22 @@ SECONDARY_INTERFACE = {
}
}
+SECONDARY_INTERFACE_NO_IP = {
+ "macAddress": "220D3A047598",
+ "ipv6": {
+ "ipAddress": []
+ },
+ "ipv4": {
+ "subnet": [
+ {
+ "prefix": "24",
+ "address": "10.0.1.0"
+ }
+ ],
+ "ipAddress": []
+ }
+}
+
IMDS_NETWORK_METADATA = {
"interface": [
{
@@ -1139,6 +1155,30 @@ scbus-1 on xpt0 bus 0
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
+ @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
+ return_value=None)
+ def test_network_config_set_from_imds_for_secondary_nic_no_ip(
+ self, m_driver):
+ """If an IP address is empty then there should no config for it."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+ expected_network_config = {
+ 'ethernets': {
+ 'eth0': {'set-name': 'eth0',
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100}}},
+ 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ imds_data['network']['interface'].append(SECONDARY_INTERFACE_NO_IP)
+ self.m_get_metadata_from_imds.return_value = imds_data
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+ self.assertEqual(expected_network_config, dsrc.network_config)
+
def test_availability_zone_set_from_imds(self):
"""Datasource.availability returns IMDS platformFaultDomain."""
sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
@@ -1757,7 +1797,9 @@ scbus-1 on xpt0 bus 0
dsrc.get_data()
dsrc.setup(True)
ssh_keys = dsrc.get_public_ssh_keys()
- self.assertEqual(ssh_keys, ['key1'])
+ # Temporarily alter this test so that SSH public keys
+ # from IMDS are *not* going to be in use to fix a regression.
+ self.assertEqual(ssh_keys, [])
self.assertEqual(m_parse_certificates.call_count, 0)
@mock.patch(MOCKPATH + 'get_metadata_from_imds')
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 16773de5..dce01f5d 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -17,6 +17,7 @@ from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceOVF as dsovf
from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
CustomScriptNotFound)
+from cloudinit.safeyaml import YAMLError
MPATH = 'cloudinit.sources.DataSourceOVF.'
@@ -138,16 +139,29 @@ class TestDatasourceOVF(CiTestCase):
'DEBUG: No system-product-name found', self.logs.getvalue())
def test_get_data_no_vmware_customization_disabled(self):
- """When vmware customization is disabled via sys_cfg log a message."""
+ """When cloud-init workflow for vmware is disabled via sys_cfg and
+ no meta data provided, log a message.
+ """
paths = Paths({'cloud_dir': self.tdir})
ds = self.datasource(
sys_cfg={'disable_vmware_customization': True}, distro={},
paths=paths)
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345345
+ """)
+ util.write_file(conf_file, conf_content)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
{'dmi.read_dmi_data': 'vmware',
'transport_iso9660': NOT_FOUND,
- 'transport_vmware_guestinfo': NOT_FOUND},
+ 'transport_vmware_guestinfo': NOT_FOUND,
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -344,6 +358,279 @@ class TestDatasourceOVF(CiTestCase):
'vmware (%s/seed/ovf-env.xml)' % self.tdir,
ds.subplatform)
+ def test_get_data_cloudinit_metadata_json(self):
+ """Test metadata can be loaded to cloud-init metadata and network.
+ The metadata format is json.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """)
+ util.write_file(conf_file, conf_content)
+ # Prepare the meta data file
+ metadata_file = self.tmp_path('test-meta', self.tdir)
+ metadata_content = dedent("""\
+ {
+ "instance-id": "cloud-vm",
+ "local-hostname": "my-host.domain.com",
+ "network": {
+ "version": 2,
+ "ethernets": {
+ "eths": {
+ "match": {
+ "name": "ens*"
+ },
+ "dhcp4": true
+ }
+ }
+ }
+ }
+ """)
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ result = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'collect_imc_file_paths': [self.tdir + '/test-meta', '', ''],
+ 'get_nics_to_enable': ''},
+ ds._get_data)
+
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata['instance-id'])
+ self.assertEqual("my-host.domain.com", ds.metadata['local-hostname'])
+ self.assertEqual(2, ds.network_config['version'])
+ self.assertTrue(ds.network_config['ethernets']['eths']['dhcp4'])
+
+ def test_get_data_cloudinit_metadata_yaml(self):
+ """Test metadata can be loaded to cloud-init metadata and network.
+ The metadata format is yaml.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """)
+ util.write_file(conf_file, conf_content)
+ # Prepare the meta data file
+ metadata_file = self.tmp_path('test-meta', self.tdir)
+ metadata_content = dedent("""\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """)
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ result = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'collect_imc_file_paths': [self.tdir + '/test-meta', '', ''],
+ 'get_nics_to_enable': ''},
+ ds._get_data)
+
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata['instance-id'])
+ self.assertEqual("my-host.domain.com", ds.metadata['local-hostname'])
+ self.assertEqual(2, ds.network_config['version'])
+ self.assertTrue(ds.network_config['ethernets']['nics']['dhcp4'])
+
+ def test_get_data_cloudinit_metadata_not_valid(self):
+ """Test metadata is not JSON or YAML format.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """)
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path('test-meta', self.tdir)
+ metadata_content = "[This is not json or yaml format]a=b"
+ util.write_file(metadata_file, metadata_content)
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ with self.assertRaises(YAMLError) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'collect_imc_file_paths': [
+ self.tdir + '/test-meta', '', ''
+ ],
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+
+ self.assertIn("expected '<document start>', but found '<scalar>'",
+ str(context.exception))
+
+ def test_get_data_cloudinit_metadata_not_found(self):
+ """Test metadata file can't be found.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ """)
+ util.write_file(conf_file, conf_content)
+ # Don't prepare the meta data file
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ with self.assertRaises(FileNotFoundError) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+
+ self.assertIn('is not found', str(context.exception))
+
+ def test_get_data_cloudinit_userdata(self):
+ """Test user data can be loaded to cloud-init user data.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': False}, distro={},
+ paths=paths)
+
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ USERDATA = test-user
+ """)
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path('test-meta', self.tdir)
+ metadata_content = dedent("""\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """)
+ util.write_file(metadata_file, metadata_content)
+
+ # Prepare the user data file
+ userdata_file = self.tmp_path('test-user', self.tdir)
+ userdata_content = "This is the user data"
+ util.write_file(userdata_file, userdata_content)
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ result = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'collect_imc_file_paths': [self.tdir + '/test-meta',
+ self.tdir + '/test-user', ''],
+ 'get_nics_to_enable': ''},
+ ds._get_data)
+
+ self.assertTrue(result)
+ self.assertEqual("cloud-vm", ds.metadata['instance-id'])
+ self.assertEqual(userdata_content, ds.userdata_raw)
+
+ def test_get_data_cloudinit_userdata_not_found(self):
+ """Test userdata file can't be found.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CLOUDINIT]
+ METADATA = test-meta
+ USERDATA = test-user
+ """)
+ util.write_file(conf_file, conf_content)
+
+ # Prepare the meta data file
+ metadata_file = self.tmp_path('test-meta', self.tdir)
+ metadata_content = dedent("""\
+ instance-id: cloud-vm
+ local-hostname: my-host.domain.com
+ network:
+ version: 2
+ ethernets:
+ nics:
+ match:
+ name: ens*
+ dhcp4: yes
+ """)
+ util.write_file(metadata_file, metadata_content)
+
+ # Don't prepare the user data file
+
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ with self.assertRaises(FileNotFoundError) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'dmi.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+
+ self.assertIn('is not found', str(context.exception))
+
class TestTransportIso9660(CiTestCase):
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index e74a0a08..6e3831ed 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -47,12 +47,20 @@ class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
- distro = self._fetch_distro('ubuntu')
self.paths = None
- self.cloud = cloud.Cloud(None, self.paths, None, distro, None)
self.log = logging.getLogger("TestNoConfig")
self.args = []
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def _get_cloud(self, kind):
+ distro = self._fetch_distro(kind)
+ return cloud.Cloud(None, self.paths, None, distro, None)
+
+ def _mock_init(self):
self.mocks = ExitStack()
self.addCleanup(self.mocks.close)
@@ -64,11 +72,6 @@ class TestConfig(TestCase):
self.mock_remove = self.mocks.enter_context(
mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
- def _fetch_distro(self, kind):
- cls = distros.fetch(kind)
- paths = helpers.Paths({})
- return cls(kind, {}, paths)
-
def test_no_trusted_list(self):
"""
Test that no certificates are written if the 'trusted' key is not
@@ -76,71 +79,95 @@ class TestConfig(TestCase):
"""
config = {"ca-certs": {}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.assertEqual(self.mock_add.call_count, 0)
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 0)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ for distro_name in cc_ca_certs.distros:
+ self._mock_init()
+ cloud = self._get_cloud(distro_name)
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ cc_ca_certs.handle(self.name, config, cloud, self.log, self.args)
- self.mock_add.assert_called_once_with(['CERT1'])
- self.assertEqual(self.mock_update.call_count, 1)
- self.assertEqual(self.mock_remove.call_count, 1)
+ self.mock_add.assert_called_once_with(conf, ['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
class TestAddCaCerts(TestCase):
@@ -152,12 +179,20 @@ class TestAddCaCerts(TestCase):
self.paths = helpers.Paths({
'cloud_dir': tmpdir,
})
+ self.add_patch("cloudinit.config.cc_ca_certs.os.stat", "m_stat")
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- with mock.patch.object(util, 'write_file') as mockobj:
- cc_ca_certs.add_ca_certs([])
- self.assertEqual(mockobj.call_count, 0)
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_ca_certs.add_ca_certs(conf, [])
+ self.assertEqual(mockobj.call_count, 0)
def test_single_cert_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -167,20 +202,28 @@ class TestAddCaCerts(TestCase):
ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
- cc_ca_certs.add_ca_certs([cert])
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf", expected, omode="wb")])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
+
+ cc_ca_certs.add_ca_certs(conf, [cert])
+
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ expected, omode="wb")])
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -189,24 +232,31 @@ class TestAddCaCerts(TestCase):
ca_certs_content = "line1\nline2\nline3"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
+
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
- cc_ca_certs.add_ca_certs([cert])
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode="wb")])
+ cc_ca_certs.add_ca_certs(conf, [cert])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ "%s\n%s\n" % (ca_certs_content,
+ conf['ca_cert_filename']),
+ omode="wb")])
+
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
def test_single_cert_to_empty_existing_ca_file(self):
"""Test adding a single certificate to the trusted CAs
@@ -215,20 +265,22 @@ class TestAddCaCerts(TestCase):
expected = "cloud-init-ca-certs.crt\n"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file', autospec=True))
- mock_stat = mocks.enter_context(
- mock.patch("cloudinit.config.cc_ca_certs.os.stat")
- )
- mock_stat.return_value.st_size = 0
+ self.m_stat.return_value.st_size = 0
- cc_ca_certs.add_ca_certs([cert])
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(util, 'write_file',
+ autospec=True) as m_write:
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0o644),
- mock.call("/etc/ca-certificates.conf", expected, omode="wb")])
+ cc_ca_certs.add_ca_certs(conf, [cert])
+
+ m_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ cert, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ m_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ expected, omode="wb")])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
@@ -236,32 +288,41 @@ class TestAddCaCerts(TestCase):
expected_cert_file = "\n".join(certs)
ca_certs_content = "line1\nline2\nline3"
- with ExitStack() as mocks:
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_load = mocks.enter_context(
- mock.patch.object(util, 'load_file',
- return_value=ca_certs_content))
+ self.m_stat.return_value.st_size = 1
+
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- cc_ca_certs.add_ca_certs(certs)
+ cc_ca_certs.add_ca_certs(conf, certs)
- mock_write.assert_has_calls([
- mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0o644),
- mock.call("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content,
- "cloud-init-ca-certs.crt"),
- omode='wb')])
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_full_path'],
+ expected_cert_file, mode=0o644)])
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_has_calls([
+ mock.call(conf['ca_cert_config'],
+ "%s\n%s\n" % (ca_certs_content,
+ conf['ca_cert_filename']),
+ omode='wb')])
- mock_load.assert_called_once_with("/etc/ca-certificates.conf")
+ mock_load.assert_called_once_with(conf['ca_cert_config'])
class TestUpdateCaCerts(unittest.TestCase):
def test_commands(self):
- with mock.patch.object(subp, 'subp') as mockobj:
- cc_ca_certs.update_ca_certs()
- mockobj.assert_called_once_with(
- ["update-ca-certificates"], capture=False)
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+ with mock.patch.object(subp, 'subp') as mockobj:
+ cc_ca_certs.update_ca_certs(conf)
+ mockobj.assert_called_once_with(
+ conf['ca_cert_update_cmd'], capture=False)
class TestRemoveDefaultCaCerts(TestCase):
@@ -275,24 +336,31 @@ class TestRemoveDefaultCaCerts(TestCase):
})
def test_commands(self):
- with ExitStack() as mocks:
- mock_delete = mocks.enter_context(
- mock.patch.object(util, 'delete_dir_contents'))
- mock_write = mocks.enter_context(
- mock.patch.object(util, 'write_file'))
- mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp'))
-
- cc_ca_certs.remove_default_ca_certs('ubuntu')
-
- mock_delete.assert_has_calls([
- mock.call("/usr/share/ca-certificates/"),
- mock.call("/etc/ssl/certs/")])
-
- mock_write.assert_called_once_with(
- "/etc/ca-certificates.conf", "", mode=0o644)
-
- mock_subp.assert_called_once_with(
- ('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
+ for distro_name in cc_ca_certs.distros:
+ conf = cc_ca_certs._distro_ca_certs_configs(distro_name)
+
+ with ExitStack() as mocks:
+ mock_delete = mocks.enter_context(
+ mock.patch.object(util, 'delete_dir_contents'))
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_subp = mocks.enter_context(
+ mock.patch.object(subp, 'subp'))
+
+ cc_ca_certs.remove_default_ca_certs(distro_name, conf)
+
+ mock_delete.assert_has_calls([
+ mock.call(conf['ca_cert_path']),
+ mock.call(conf['ca_cert_system_path'])])
+
+ if conf['ca_cert_config'] is not None:
+ mock_write.assert_called_once_with(
+ conf['ca_cert_config'], "", mode=0o644)
+
+ if distro_name in ['debian', 'ubuntu']:
+ mock_subp.assert_called_once_with(
+ ('debconf-set-selections', '-'),
+ "ca-certificates \
+ca-certificates/trust_new_crts select no")
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 70453683..38d934d4 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1365,10 +1365,11 @@ NETWORK_CONFIGS = {
},
'expected_sysconfig_rhel': {
'ifcfg-iface0': textwrap.dedent("""\
- BOOTPROTO=none
+ BOOTPROTO=dhcp
DEVICE=iface0
DHCPV6C=yes
IPV6INIT=yes
+ IPV6_AUTOCONF=no
IPV6_FORCE_ACCEPT_RA=yes
DEVICE=iface0
NM_CONTROLLED=no
@@ -4819,6 +4820,9 @@ class TestEniRoundTrip(CiTestCase):
{'type': 'route', 'id': 6,
'metric': 1, 'destination': '10.0.200.0/16',
'gateway': '172.23.31.1'},
+ {'type': 'route', 'id': 7,
+ 'metric': 1, 'destination': '10.0.0.100/32',
+ 'gateway': '172.23.31.1'},
]
files = self._render_and_read(
@@ -4842,6 +4846,10 @@ class TestEniRoundTrip(CiTestCase):
'172.23.31.1 metric 1 || true'),
('pre-down route del -net 10.0.200.0/16 gw '
'172.23.31.1 metric 1 || true'),
+ ('post-up route add -host 10.0.0.100/32 gw '
+ '172.23.31.1 metric 1 || true'),
+ ('pre-down route del -host 10.0.0.100/32 gw '
+ '172.23.31.1 metric 1 || true'),
]
found = files['/etc/network/interfaces'].splitlines()
diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py
index 9c7d25fa..430cc69f 100644
--- a/tests/unittests/test_vmware_config_file.py
+++ b/tests/unittests/test_vmware_config_file.py
@@ -525,5 +525,21 @@ class TestVmwareNetConfig(CiTestCase):
'gateway': '10.20.87.253'}]}],
nc.generate())
+ def test_meta_data(self):
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+ conf = Config(cf)
+ self.assertIsNone(conf.meta_data_name)
+ cf._insertKey("CLOUDINIT|METADATA", "test-metadata")
+ conf = Config(cf)
+ self.assertEqual("test-metadata", conf.meta_data_name)
+
+ def test_user_data(self):
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+ conf = Config(cf)
+ self.assertIsNone(conf.user_data_name)
+ cf._insertKey("CLOUDINIT|USERDATA", "test-userdata")
+ conf = Config(cf)
+ self.assertEqual("test-userdata", conf.user_data_name)
+
# vi: ts=4 expandtab