diff options
Diffstat (limited to 'tests')
32 files changed, 1757 insertions, 296 deletions
diff --git a/tests/cloud_tests/testcases/examples/TODO.md b/tests/cloud_tests/testcases/examples/TODO.md index 8db0e98e..cde699a7 100644 --- a/tests/cloud_tests/testcases/examples/TODO.md +++ b/tests/cloud_tests/testcases/examples/TODO.md @@ -6,7 +6,7 @@ Below lists each of the issing examples and why it is not currently added. - Puppet (takes > 60 seconds to run) - Manage resolve.conf (lxd backend overrides changes) - Adding a yum repository (need centos system) - - Register RedHat Subscription (need centos system + subscription) + - Register Red Hat Subscription (need centos system + subscription) - Adjust mount points mounted (need multiple disks) - Call a url when finished (need end point) - Reboot/poweroff when finished (how to test) diff --git a/tests/integration_tests/bugs/test_gh570.py b/tests/integration_tests/bugs/test_gh570.py new file mode 100644 index 00000000..534cfb9a --- /dev/null +++ b/tests/integration_tests/bugs/test_gh570.py @@ -0,0 +1,39 @@ +"""Integration test for #570. + +Test that we can add optional vendor-data to the seedfrom file in a +NoCloud environment +""" + +from tests.integration_tests.instances import IntegrationInstance +import pytest + +VENDOR_DATA = """\ +#cloud-config +runcmd: + - touch /var/tmp/seeded_vendordata_test_file +""" + + +# Only running on LXD because we need NoCloud for this test +@pytest.mark.sru_2020_11 +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +def test_nocloud_seedfrom_vendordata(client: IntegrationInstance): + seed_dir = '/var/tmp/test_seed_dir' + result = client.execute( + "mkdir {seed_dir} && " + "touch {seed_dir}/user-data && " + "touch {seed_dir}/meta-data && " + "echo 'seedfrom: {seed_dir}/' > " + "/var/lib/cloud/seed/nocloud-net/meta-data".format(seed_dir=seed_dir) + ) + assert result.return_code == 0 + + client.write_to_file( + '{}/vendor-data'.format(seed_dir), + VENDOR_DATA, + ) + client.execute('cloud-init clean --logs') + client.restart() + assert client.execute('cloud-init status').ok + assert 'seeded_vendordata_test_file' in client.execute('ls /var/tmp') diff --git a/tests/integration_tests/bugs/test_gh626.py b/tests/integration_tests/bugs/test_gh626.py new file mode 100644 index 00000000..2d336462 --- /dev/null +++ b/tests/integration_tests/bugs/test_gh626.py @@ -0,0 +1,42 @@ +"""Integration test for gh-626. + +Ensure if wakeonlan is specified in the network config that it is rendered +in the /etc/network/interfaces or netplan config. +""" + +import pytest +import yaml + +from tests.integration_tests.clouds import ImageSpecification +from tests.integration_tests.instances import IntegrationInstance + + +NETWORK_CONFIG = """\ +version: 2 +ethernets: + eth0: + dhcp4: true + wakeonlan: true +""" + +EXPECTED_ENI_END = """\ +iface eth0 inet dhcp + ethernet-wol g""" + + +@pytest.mark.sru_2020_11 +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +@pytest.mark.lxd_config_dict({ + 'user.network-config': NETWORK_CONFIG +}) +def test_wakeonlan(client: IntegrationInstance): + if ImageSpecification.from_os_image().release == 'xenial': + eni = client.execute('cat /etc/network/interfaces.d/50-cloud-init.cfg') + assert eni.endswith(EXPECTED_ENI_END) + return + + netplan_cfg = client.execute('cat /etc/netplan/50-cloud-init.yaml') + netplan_yaml = yaml.safe_load(netplan_cfg) + assert 'wakeonlan' in netplan_yaml['network']['ethernets']['eth0'] + assert netplan_yaml['network']['ethernets']['eth0']['wakeonlan'] is True diff --git a/tests/integration_tests/bugs/test_gh632.py b/tests/integration_tests/bugs/test_gh632.py new file mode 100644 index 00000000..3c1f9347 --- /dev/null +++ b/tests/integration_tests/bugs/test_gh632.py @@ -0,0 +1,33 @@ +"""Integration test for gh-632. + +Verify that if cloud-init is using DataSourceRbxCloud, there is +no traceback if the metadata disk cannot be found. +""" + +import pytest + +from tests.integration_tests.instances import IntegrationInstance + + +# With some datasource hacking, we can run this on a NoCloud instance +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +@pytest.mark.sru_2020_11 +def test_datasource_rbx_no_stacktrace(client: IntegrationInstance): + client.write_to_file( + '/etc/cloud/cloud.cfg.d/90_dpkg.cfg', + 'datasource_list: [ RbxCloud, NoCloud ]\n', + ) + client.write_to_file( + '/etc/cloud/ds-identify.cfg', + 'policy: enabled\n', + ) + client.execute('cloud-init clean --logs') + client.restart() + + log = client.read_from_file('/var/log/cloud-init.log') + assert 'WARNING' not in log + assert 'Traceback' not in log + assert 'Failed to load metadata and userdata' not in log + assert ("Getting data from <class 'cloudinit.sources.DataSourceRbxCloud." + "DataSourceRbxCloud'> failed") not in log diff --git a/tests/integration_tests/bugs/test_gh668.py b/tests/integration_tests/bugs/test_gh668.py new file mode 100644 index 00000000..a3a0c374 --- /dev/null +++ b/tests/integration_tests/bugs/test_gh668.py @@ -0,0 +1,37 @@ +"""Integration test for gh-668. + +Ensure that static route to host is working correctly. +The original problem is specific to the ENI renderer but that test is suitable +for all network configuration outputs. +""" + +import pytest + +from tests.integration_tests.instances import IntegrationInstance + + +DESTINATION_IP = "172.16.0.10" +GATEWAY_IP = "10.0.0.100" + +NETWORK_CONFIG = """\ +version: 2 +ethernets: + eth0: + addresses: [10.0.0.10/8] + dhcp4: false + routes: + - to: {}/32 + via: {} +""".format(DESTINATION_IP, GATEWAY_IP) + +EXPECTED_ROUTE = "{} via {}".format(DESTINATION_IP, GATEWAY_IP) + + +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +@pytest.mark.lxd_config_dict({ + "user.network-config": NETWORK_CONFIG, +}) +def test_static_route_to_host(client: IntegrationInstance): + route = client.execute("ip route | grep {}".format(DESTINATION_IP)) + assert route.startswith(EXPECTED_ROUTE) diff --git a/tests/integration_tests/bugs/test_gh671.py b/tests/integration_tests/bugs/test_gh671.py new file mode 100644 index 00000000..5e90cdda --- /dev/null +++ b/tests/integration_tests/bugs/test_gh671.py @@ -0,0 +1,55 @@ +"""Integration test for gh-671. + +Verify that on Azure that if a default user and password are specified +through the Azure API that a change in the default password overwrites +the old password +""" + +import crypt + +import pytest + +from tests.integration_tests.clouds import IntegrationCloud + +OLD_PASSWORD = 'DoIM33tTheComplexityRequirements!??' +NEW_PASSWORD = 'DoIM33tTheComplexityRequirementsNow!??' + + +def _check_password(instance, unhashed_password): + shadow_password = instance.execute('getent shadow ubuntu').split(':')[1] + salt = shadow_password.rsplit('$', 1)[0] + hashed_password = crypt.crypt(unhashed_password, salt) + assert shadow_password == hashed_password + + +@pytest.mark.azure +@pytest.mark.sru_2020_11 +def test_update_default_password(setup_image, session_cloud: IntegrationCloud): + os_profile = { + 'os_profile': { + 'admin_password': '', + 'linux_configuration': { + 'disable_password_authentication': False + } + } + } + os_profile['os_profile']['admin_password'] = OLD_PASSWORD + instance1 = session_cloud.launch(launch_kwargs={'vm_params': os_profile}) + + _check_password(instance1, OLD_PASSWORD) + + snapshot_id = instance1.cloud.cloud_instance.snapshot( + instance1.instance, + delete_provisioned_user=False + ) + + os_profile['os_profile']['admin_password'] = NEW_PASSWORD + try: + with session_cloud.launch(launch_kwargs={ + 'image_id': snapshot_id, + 'vm_params': os_profile, + }) as instance2: + _check_password(instance2, NEW_PASSWORD) + finally: + session_cloud.cloud_instance.delete_image(snapshot_id) + instance1.destroy() diff --git a/tests/integration_tests/bugs/test_lp1813396.py b/tests/integration_tests/bugs/test_lp1813396.py new file mode 100644 index 00000000..7ad0e809 --- /dev/null +++ b/tests/integration_tests/bugs/test_lp1813396.py @@ -0,0 +1,34 @@ +"""Integration test for lp-1813396 + +Ensure gpg is called with no tty flag. +""" + +import pytest + +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.log_utils import verify_ordered_items_in_text + + +USER_DATA = """\ +#cloud-config +apt: + sources: + cloudinit: + source: 'deb [arch=amd64] http://ppa.launchpad.net/cloud-init-dev/daily/ubuntu focal main' + keyserver: keyserver.ubuntu.com + keyid: E4D304DF +""" # noqa: E501 + + +@pytest.mark.sru_2020_11 +@pytest.mark.user_data(USER_DATA) +def test_gpg_no_tty(client: IntegrationInstance): + log = client.read_from_file('/var/log/cloud-init.log') + to_verify = [ + "Running command ['gpg', '--no-tty', " + "'--keyserver=keyserver.ubuntu.com', '--recv-keys', 'E4D304DF'] " + "with allowed return codes [0] (shell=False, capture=True)", + "Imported key 'E4D304DF' from keyserver 'keyserver.ubuntu.com'", + "finish: modules-config/config-apt-configure: SUCCESS", + ] + verify_ordered_items_in_text(to_verify, log) diff --git a/tests/integration_tests/bugs/test_lp1898997.py b/tests/integration_tests/bugs/test_lp1898997.py new file mode 100644 index 00000000..54c88d82 --- /dev/null +++ b/tests/integration_tests/bugs/test_lp1898997.py @@ -0,0 +1,71 @@ +"""Integration test for LP: #1898997 + +cloud-init was incorrectly excluding Open vSwitch bridge members from its list +of interfaces. This meant that instances which had only one interface which +was in an Open vSwitch bridge would not boot correctly: cloud-init would not +find the expected physical interfaces, so would not apply network config. + +This test checks that cloud-init believes it has successfully applied the +network configuration, and confirms that the bridge can be used to ping the +default gateway. +""" +import pytest + +MAC_ADDRESS = "de:ad:be:ef:12:34" + + +NETWORK_CONFIG = """\ +bridges: + ovs-br: + dhcp4: true + interfaces: + - enp5s0 + macaddress: 52:54:00:d9:08:1c + mtu: 1500 + openvswitch: {{}} +ethernets: + enp5s0: + mtu: 1500 + set-name: enp5s0 + match: + macaddress: {} +version: 2 +""".format(MAC_ADDRESS) + + +@pytest.mark.lxd_config_dict({ + "user.network-config": NETWORK_CONFIG, + "volatile.eth0.hwaddr": MAC_ADDRESS, +}) +@pytest.mark.lxd_vm +@pytest.mark.not_bionic +@pytest.mark.not_xenial +@pytest.mark.sru_2020_11 +@pytest.mark.ubuntu +class TestInterfaceListingWithOpenvSwitch: + def test_ovs_member_interfaces_not_excluded(self, client): + # We need to install openvswitch for our provided network configuration + # to apply (on next boot), so DHCP on our default interface to fetch it + client.execute("dhclient enp5s0") + client.execute("apt update -qqy") + client.execute("apt-get install -qqy openvswitch-switch") + + # Now our networking config should successfully apply on a clean reboot + client.execute("cloud-init clean --logs") + client.restart() + + cloudinit_output = client.read_from_file("/var/log/cloud-init.log") + + # Confirm that the network configuration was applied successfully + assert "WARN" not in cloudinit_output + # Confirm that the applied network config created the OVS bridge + assert "ovs-br" in client.execute("ip addr") + + # Test that we can ping our gateway using our bridge + gateway = client.execute( + "ip -4 route show default | awk '{ print $3 }'" + ) + ping_result = client.execute( + "ping -c 1 -W 1 -I ovs-br {}".format(gateway) + ) + assert ping_result.ok diff --git a/tests/integration_tests/bugs/test_lp1900837.py b/tests/integration_tests/bugs/test_lp1900837.py index 3fe7d0d0..fcc2b751 100644 --- a/tests/integration_tests/bugs/test_lp1900837.py +++ b/tests/integration_tests/bugs/test_lp1900837.py @@ -22,7 +22,8 @@ class TestLogPermissionsNotResetOnReboot: assert "600" == _get_log_perms(client) # Reboot - client.instance.restart() + client.restart() + assert client.execute('cloud-init status').ok # Check that permissions are not reset on reboot assert "600" == _get_log_perms(client) diff --git a/tests/integration_tests/bugs/test_lp1910835.py b/tests/integration_tests/bugs/test_lp1910835.py new file mode 100644 index 00000000..87f92d5e --- /dev/null +++ b/tests/integration_tests/bugs/test_lp1910835.py @@ -0,0 +1,66 @@ +"""Integration test for LP: #1910835. + +If users do not provide an SSH key and instead ask Azure to generate a key for +them, the key material available in the IMDS may include CRLF sequences. Prior +to e56b55452549cb037da0a4165154ffa494e9678a, the Azure datasource handled keys +via a certificate, the tooling for which removed these sequences. This test +ensures that cloud-init does not regress support for this Azure behaviour. + +This test provides the SSH key configured for tests to the instance in two +ways: firstly, with CRLFs to mimic the generated keys, via the Azure API; +secondly, as user-data in unmodified form. This means that even on systems +which exhibit the bug fetching the platform's metadata, we can SSH into the SUT +to confirm this (instead of having to assert SSH failure; there are lots of +reasons SSH might fail). + +Once SSH'd in, we check that the two keys in .ssh/authorized_keys have the same +material: if the Azure datasource has removed the CRLFs correctly, then they +will match. +""" +import pytest + + +USER_DATA_TMPL = """\ +#cloud-config +ssh_authorized_keys: + - {}""" + + +@pytest.mark.sru_2021_01 +@pytest.mark.azure +def test_crlf_in_azure_metadata_ssh_keys(session_cloud, setup_image): + authorized_keys_path = "/home/{}/.ssh/authorized_keys".format( + session_cloud.cloud_instance.username + ) + # Pass in user-data to allow us to access the instance when the normal + # path fails + key_data = session_cloud.cloud_instance.key_pair.public_key_content + user_data = USER_DATA_TMPL.format(key_data) + # Throw a CRLF into the otherwise good key data, to emulate Azure's + # behaviour for generated keys + key_data = key_data[:20] + "\r\n" + key_data[20:] + vm_params = { + "os_profile": { + "linux_configuration": { + "ssh": { + "public_keys": [ + {"path": authorized_keys_path, "key_data": key_data} + ] + } + } + } + } + with session_cloud.launch( + launch_kwargs={"vm_params": vm_params, "user_data": user_data} + ) as client: + authorized_keys = ( + client.read_from_file(authorized_keys_path).strip().splitlines() + ) + # We expect one key from the cloud, one from user-data + assert 2 == len(authorized_keys) + # And those two keys should be the same, except for a possible key + # comment, which Azure strips out + assert ( + authorized_keys[0].rsplit(" ")[:2] + == authorized_keys[1].split(" ")[:2] + ) diff --git a/tests/integration_tests/clouds.py b/tests/integration_tests/clouds.py index 88ac4408..9eebb10a 100644 --- a/tests/integration_tests/clouds.py +++ b/tests/integration_tests/clouds.py @@ -6,7 +6,7 @@ from pycloudlib import EC2, GCE, Azure, OCI, LXDContainer, LXDVirtualMachine from pycloudlib.lxd.instance import LXDInstance import cloudinit -from cloudinit.subp import subp +from cloudinit.subp import subp, ProcessExecutionError from tests.integration_tests import integration_settings from tests.integration_tests.instances import ( IntegrationEc2Instance, @@ -25,6 +25,65 @@ except ImportError: log = logging.getLogger('integration_testing') +def _get_ubuntu_series() -> list: + """Use distro-info-data's ubuntu.csv to get a list of Ubuntu series""" + out = "" + try: + out, _err = subp(["ubuntu-distro-info", "-a"]) + except ProcessExecutionError: + log.info( + "ubuntu-distro-info (from the distro-info package) must be" + " installed to guess Ubuntu os/release" + ) + return out.splitlines() + + +class ImageSpecification: + """A specification of an image to launch for testing. + + If either of ``os`` and ``release`` are not specified, an attempt will be + made to infer the correct values for these on instantiation. + + :param image_id: + The image identifier used by the rest of the codebase to launch this + image. + :param os: + An optional string describing the operating system this image is for + (e.g. "ubuntu", "rhel", "freebsd"). + :param release: + A optional string describing the operating system release (e.g. + "focal", "8"; the exact values here will depend on the OS). + """ + + def __init__( + self, + image_id: str, + os: "Optional[str]" = None, + release: "Optional[str]" = None, + ): + if image_id in _get_ubuntu_series(): + if os is None: + os = "ubuntu" + if release is None: + release = image_id + + self.image_id = image_id + self.os = os + self.release = release + log.info( + "Detected image: image_id=%s os=%s release=%s", + self.image_id, + self.os, + self.release, + ) + + @classmethod + def from_os_image(cls): + """Return an ImageSpecification for integration_settings.OS_IMAGE.""" + parts = integration_settings.OS_IMAGE.split("::", 2) + return cls(*parts) + + class IntegrationCloud(ABC): datasource = None # type: Optional[str] integration_instance_cls = IntegrationInstance @@ -32,7 +91,23 @@ class IntegrationCloud(ABC): def __init__(self, settings=integration_settings): self.settings = settings self.cloud_instance = self._get_cloud_instance() - self.image_id = self._get_initial_image() + if settings.PUBLIC_SSH_KEY is not None: + # If we have a non-default key, use it. + self.cloud_instance.use_key( + settings.PUBLIC_SSH_KEY, name=settings.KEYPAIR_NAME + ) + elif settings.KEYPAIR_NAME is not None: + # Even if we're using the default key, it may still have a + # different name in the clouds, so we need to set it separately. + self.cloud_instance.key_pair.name = settings.KEYPAIR_NAME + self._released_image_id = self._get_initial_image() + self.snapshot_id = None + + @property + def image_id(self): + if self.snapshot_id: + return self.snapshot_id + return self._released_image_id def emit_settings_to_log(self) -> None: log.info( @@ -50,21 +125,20 @@ class IntegrationCloud(ABC): raise NotImplementedError def _get_initial_image(self): - image_id = self.settings.OS_IMAGE + image = ImageSpecification.from_os_image() try: - image_id = self.cloud_instance.released_image( - self.settings.OS_IMAGE) + return self.cloud_instance.released_image(image.image_id) except (ValueError, IndexError): - pass - return image_id + return image.image_id def _perform_launch(self, launch_kwargs): pycloudlib_instance = self.cloud_instance.launch(**launch_kwargs) - pycloudlib_instance.wait(raise_on_cloudinit_failure=False) return pycloudlib_instance def launch(self, user_data=None, launch_kwargs=None, settings=integration_settings): + if launch_kwargs is None: + launch_kwargs = {} if self.settings.EXISTING_INSTANCE_ID: log.info( 'Not launching instance due to EXISTING_INSTANCE_ID. ' @@ -76,10 +150,8 @@ class IntegrationCloud(ABC): kwargs = { 'image_id': self.image_id, 'user_data': user_data, - 'wait': False, } - if launch_kwargs: - kwargs.update(launch_kwargs) + kwargs.update(launch_kwargs) log.info( "Launching instance with launch_kwargs:\n{}".format( "\n".join("{}={}".format(*item) for item in kwargs.items()) @@ -87,9 +159,18 @@ class IntegrationCloud(ABC): ) pycloudlib_instance = self._perform_launch(kwargs) - log.info('Launched instance: %s', pycloudlib_instance) - return self.get_instance(pycloudlib_instance, settings) + instance = self.get_instance(pycloudlib_instance, settings) + if kwargs.get('wait', True): + # If we aren't waiting, we can't rely on command execution here + log.info( + 'cloud-init version: %s', + instance.execute("cloud-init --version") + ) + serial = instance.execute("grep serial /etc/cloud/build.info") + if serial: + log.info('image serial: %s', serial.split()[1]) + return instance def get_instance(self, cloud_instance, settings=integration_settings): return self.integration_instance_cls(self, cloud_instance, settings) @@ -100,6 +181,19 @@ class IntegrationCloud(ABC): def snapshot(self, instance): return self.cloud_instance.snapshot(instance, clean=True) + def delete_snapshot(self): + if self.snapshot_id: + if self.settings.KEEP_IMAGE: + log.info( + 'NOT deleting snapshot image created for this testrun ' + 'because KEEP_IMAGE is True: %s', self.snapshot_id) + else: + log.info( + 'Deleting snapshot image created for this testrun: %s', + self.snapshot_id + ) + self.cloud_instance.delete_image(self.snapshot_id) + class Ec2Cloud(IntegrationCloud): datasource = 'ec2' @@ -116,9 +210,6 @@ class GceCloud(IntegrationCloud): def _get_cloud_instance(self): return GCE( tag='gce-integration-test', - project=self.settings.GCE_PROJECT, - region=self.settings.GCE_REGION, - zone=self.settings.GCE_ZONE, ) @@ -130,7 +221,14 @@ class AzureCloud(IntegrationCloud): return Azure(tag='azure-integration-test') def destroy(self): - self.cloud_instance.delete_resource_group() + if self.settings.KEEP_INSTANCE: + log.info( + 'NOT deleting resource group because KEEP_INSTANCE is true ' + 'and deleting resource group would also delete instance. ' + 'Instance and resource group must both be manually deleted.' + ) + else: + self.cloud_instance.delete_resource_group() class OciCloud(IntegrationCloud): @@ -139,8 +237,7 @@ class OciCloud(IntegrationCloud): def _get_cloud_instance(self): return OCI( - tag='oci-integration-test', - compartment_id=self.settings.OCI_COMPARTMENT_ID + tag='oci-integration-test' ) @@ -174,7 +271,7 @@ class _LxdIntegrationCloud(IntegrationCloud): def _perform_launch(self, launch_kwargs): launch_kwargs['inst_type'] = launch_kwargs.pop('instance_type', None) - launch_kwargs.pop('wait') + wait = launch_kwargs.pop('wait', True) release = launch_kwargs.pop('image_id') try: @@ -190,8 +287,7 @@ class _LxdIntegrationCloud(IntegrationCloud): ) if self.settings.CLOUD_INIT_SOURCE == 'IN_PLACE': self._mount_source(pycloudlib_instance) - pycloudlib_instance.start(wait=False) - pycloudlib_instance.wait(raise_on_cloudinit_failure=False) + pycloudlib_instance.start(wait=wait) return pycloudlib_instance diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 73b44bfc..99dd8d9e 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -1,18 +1,29 @@ # This file is part of cloud-init. See LICENSE file for license information. +import datetime +import functools import logging -import os import pytest +import os import sys +from tarfile import TarFile from contextlib import contextmanager +from pathlib import Path from tests.integration_tests import integration_settings from tests.integration_tests.clouds import ( + AzureCloud, Ec2Cloud, GceCloud, - AzureCloud, - OciCloud, + ImageSpecification, + IntegrationCloud, LxdContainerCloud, LxdVmCloud, + OciCloud, + _LxdIntegrationCloud, +) +from tests.integration_tests.instances import ( + CloudInitSource, + IntegrationInstance, ) @@ -28,6 +39,9 @@ platforms = { 'lxd_container': LxdContainerCloud, 'lxd_vm': LxdVmCloud, } +os_list = ["ubuntu"] + +session_start_time = datetime.datetime.now().strftime('%y%m%d%H%M%S') def pytest_runtest_setup(item): @@ -54,6 +68,18 @@ def pytest_runtest_setup(item): if supported_platforms and current_platform not in supported_platforms: pytest.skip(unsupported_message) + image = ImageSpecification.from_os_image() + current_os = image.os + supported_os_set = set(os_list).intersection(test_marks) + if current_os and supported_os_set and current_os not in supported_os_set: + pytest.skip("Cannot run on OS {}".format(current_os)) + if 'unstable' in test_marks and not integration_settings.RUN_UNSTABLE: + pytest.skip('Test marked unstable. Manually remove mark to run it') + + current_release = image.release + if "not_{}".format(current_release) in test_marks: + pytest.skip("Cannot run on release {}".format(current_release)) + # disable_subp_usage is defined at a higher level, but we don't # want it applied here @@ -75,82 +101,137 @@ def session_cloud(): cloud = platforms[integration_settings.PLATFORM]() cloud.emit_settings_to_log() yield cloud - cloud.destroy() + try: + cloud.delete_snapshot() + finally: + cloud.destroy() + +def get_validated_source( + session_cloud: IntegrationCloud, + source=integration_settings.CLOUD_INIT_SOURCE +) -> CloudInitSource: + if source == 'NONE': + return CloudInitSource.NONE + elif source == 'IN_PLACE': + if session_cloud.datasource not in ['lxd_container', 'lxd_vm']: + raise ValueError( + 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD') + return CloudInitSource.IN_PLACE + elif source == 'PROPOSED': + return CloudInitSource.PROPOSED + elif source.startswith('ppa:'): + return CloudInitSource.PPA + elif os.path.isfile(str(source)): + return CloudInitSource.DEB_PACKAGE + raise ValueError( + 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format(source)) -@pytest.fixture(scope='session', autouse=True) -def setup_image(session_cloud): + +@pytest.fixture(scope='session') +def setup_image(session_cloud: IntegrationCloud): """Setup the target environment with the correct version of cloud-init. So we can launch instances / run tests with the correct image """ - client = None + + source = get_validated_source(session_cloud) + if not source.installs_new_version(): + return log.info('Setting up environment for %s', session_cloud.datasource) - if integration_settings.CLOUD_INIT_SOURCE == 'NONE': - pass # that was easy - elif integration_settings.CLOUD_INIT_SOURCE == 'IN_PLACE': - if session_cloud.datasource not in ['lxd_container', 'lxd_vm']: - raise ValueError( - 'IN_PLACE as CLOUD_INIT_SOURCE only works for LXD') - # The mount needs to happen after the instance is created, so - # no further action needed here - elif integration_settings.CLOUD_INIT_SOURCE == 'PROPOSED': - client = session_cloud.launch() - client.install_proposed_image() - elif integration_settings.CLOUD_INIT_SOURCE.startswith('ppa:'): - client = session_cloud.launch() - client.install_ppa(integration_settings.CLOUD_INIT_SOURCE) - elif os.path.isfile(str(integration_settings.CLOUD_INIT_SOURCE)): - client = session_cloud.launch() - client.install_deb() - else: - raise ValueError( - 'Invalid value for CLOUD_INIT_SOURCE setting: {}'.format( - integration_settings.CLOUD_INIT_SOURCE)) - if client: - # Even if we're keeping instances, we don't want to keep this - # one around as it was just for image creation - client.destroy() + client = session_cloud.launch() + client.install_new_cloud_init(source) + # Even if we're keeping instances, we don't want to keep this + # one around as it was just for image creation + client.destroy() log.info('Done with environment setup') +def _collect_logs(instance: IntegrationInstance, node_id: str, + test_failed: bool): + """Collect logs from remote instance. + + Args: + instance: The current IntegrationInstance to collect logs from + node_id: The pytest representation of this test, E.g.: + tests/integration_tests/test_example.py::TestExample.test_example + test_failed: If test failed or not + """ + if any([ + integration_settings.COLLECT_LOGS == 'NEVER', + integration_settings.COLLECT_LOGS == 'ON_ERROR' and not test_failed + ]): + return + instance.execute( + 'cloud-init collect-logs -u -t /var/tmp/cloud-init.tar.gz') + node_id_path = Path( + node_id + .replace('.py', '') # Having a directory with '.py' would be weird + .replace('::', os.path.sep) # Turn classes/tests into paths + .replace('[', '-') # For parametrized names + .replace(']', '') # For parameterized names + ) + log_dir = Path( + integration_settings.LOCAL_LOG_PATH + ) / session_start_time / node_id_path + log.info("Writing logs to %s", log_dir) + if not log_dir.exists(): + log_dir.mkdir(parents=True) + tarball_path = log_dir / 'cloud-init.tar.gz' + instance.pull_file('/var/tmp/cloud-init.tar.gz', tarball_path) + + tarball = TarFile.open(str(tarball_path)) + tarball.extractall(path=str(log_dir)) + tarball_path.unlink() + + @contextmanager -def _client(request, fixture_utils, session_cloud): +def _client(request, fixture_utils, session_cloud: IntegrationCloud): """Fixture implementation for the client fixtures. Launch the dynamic IntegrationClient instance using any provided userdata, yield to the test, then cleanup """ - user_data = fixture_utils.closest_marker_first_arg_or( - request, 'user_data', None) - name = fixture_utils.closest_marker_first_arg_or( - request, 'instance_name', None + getter = functools.partial( + fixture_utils.closest_marker_first_arg_or, request, default=None ) + user_data = getter('user_data') + name = getter('instance_name') + lxd_config_dict = getter('lxd_config_dict') + launch_kwargs = {} if name is not None: - launch_kwargs = {"name": name} + launch_kwargs["name"] = name + if lxd_config_dict is not None: + if not isinstance(session_cloud, _LxdIntegrationCloud): + pytest.skip("lxd_config_dict requires LXD") + launch_kwargs["config_dict"] = lxd_config_dict + with session_cloud.launch( user_data=user_data, launch_kwargs=launch_kwargs ) as instance: + previous_failures = request.session.testsfailed yield instance + test_failed = request.session.testsfailed - previous_failures > 0 + _collect_logs(instance, request.node.nodeid, test_failed) @pytest.yield_fixture -def client(request, fixture_utils, session_cloud): +def client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs for every test.""" with _client(request, fixture_utils, session_cloud) as client: yield client @pytest.yield_fixture(scope='module') -def module_client(request, fixture_utils, session_cloud): +def module_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per module.""" with _client(request, fixture_utils, session_cloud) as client: yield client @pytest.yield_fixture(scope='class') -def class_client(request, fixture_utils, session_cloud): +def class_client(request, fixture_utils, session_cloud, setup_image): """Provide a client that runs once per class.""" with _client(request, fixture_utils, session_cloud) as client: yield client @@ -180,3 +261,20 @@ def pytest_assertrepr_compare(op, left, right): '"{}" not in cloud-init.log string; unexpectedly found on' " these lines:".format(left) ] + found_lines + + +def pytest_configure(config): + """Perform initial configuration, before the test runs start. + + This hook is only called if integration tests are being executed, so we can + use it to configure defaults for integration testing that differ from the + rest of the tests in the codebase. + + See + https://docs.pytest.org/en/latest/reference.html#_pytest.hookspec.pytest_configure + for pytest's documentation. + """ + if "log_cli_level" in config.option and not config.option.log_cli_level: + # If log_cli_level is available in this version of pytest and not set + # to anything, set it to INFO. + config.option.log_cli_level = "INFO" diff --git a/tests/integration_tests/instances.py b/tests/integration_tests/instances.py index 9b13288c..0d1e1aef 100644 --- a/tests/integration_tests/instances.py +++ b/tests/integration_tests/instances.py @@ -1,4 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. +from enum import Enum import logging import os import uuid @@ -25,9 +26,27 @@ def _get_tmp_path(): return '/var/tmp/{}.tmp'.format(tmp_filename) -class IntegrationInstance: - use_sudo = True +class CloudInitSource(Enum): + """Represents the cloud-init image source setting as a defined value. + + Values here represent all possible values for CLOUD_INIT_SOURCE in + tests/integration_tests/integration_settings.py. See that file for an + explanation of these values. If the value set there can't be parsed into + one of these values, an exception will be raised + """ + NONE = 1 + IN_PLACE = 2 + PROPOSED = 3 + PPA = 4 + DEB_PACKAGE = 5 + + def installs_new_version(self): + if self.name in [self.NONE.name, self.IN_PLACE.name]: + return False + return True + +class IntegrationInstance: def __init__(self, cloud: 'IntegrationCloud', instance: BaseInstance, settings=integration_settings): self.cloud = cloud @@ -37,24 +56,30 @@ class IntegrationInstance: def destroy(self): self.instance.delete() - def execute(self, command, *, use_sudo=None) -> Result: + def restart(self): + """Restart this instance (via cloud mechanism) and wait for boot. + + This wraps pycloudlib's `BaseInstance.restart` + """ + log.info("Restarting instance and waiting for boot") + self.instance.restart() + + def execute(self, command, *, use_sudo=True) -> Result: if self.instance.username == 'root' and use_sudo is False: raise Exception('Root user cannot run unprivileged') - if use_sudo is None: - use_sudo = self.use_sudo return self.instance.execute(command, use_sudo=use_sudo) def pull_file(self, remote_path, local_path): # First copy to a temporary directory because of permissions issues tmp_path = _get_tmp_path() - self.instance.execute('cp {} {}'.format(remote_path, tmp_path)) - self.instance.pull_file(tmp_path, local_path) + self.instance.execute('cp {} {}'.format(str(remote_path), tmp_path)) + self.instance.pull_file(tmp_path, str(local_path)) def push_file(self, local_path, remote_path): # First push to a temporary directory because of permissions issues tmp_path = _get_tmp_path() - self.instance.push_file(local_path, tmp_path) - self.execute('mv {} {}'.format(tmp_path, remote_path)) + self.instance.push_file(str(local_path), tmp_path) + self.execute('mv {} {}'.format(tmp_path, str(remote_path))) def read_from_file(self, remote_path) -> str: result = self.execute('cat {}'.format(remote_path)) @@ -83,36 +108,52 @@ class IntegrationInstance: os.unlink(tmp_file.name) def snapshot(self): - return self.cloud.snapshot(self.instance) - - def _install_new_cloud_init(self, remote_script): - self.execute(remote_script) + image_id = self.cloud.snapshot(self.instance) + log.info('Created new image: %s', image_id) + return image_id + + def install_new_cloud_init( + self, + source: CloudInitSource, + take_snapshot=True + ): + if source == CloudInitSource.DEB_PACKAGE: + self.install_deb() + elif source == CloudInitSource.PPA: + self.install_ppa() + elif source == CloudInitSource.PROPOSED: + self.install_proposed_image() + else: + raise Exception( + "Specified to install {} which isn't supported here".format( + source) + ) version = self.execute('cloud-init -v').split()[-1] log.info('Installed cloud-init version: %s', version) self.instance.clean() - image_id = self.snapshot() - log.info('Created new image: %s', image_id) - self.cloud.image_id = image_id + if take_snapshot: + snapshot_id = self.snapshot() + self.cloud.snapshot_id = snapshot_id def install_proposed_image(self): log.info('Installing proposed image') remote_script = ( - '{sudo} echo deb "http://archive.ubuntu.com/ubuntu ' + 'echo deb "http://archive.ubuntu.com/ubuntu ' '$(lsb_release -sc)-proposed main" | ' - '{sudo} tee /etc/apt/sources.list.d/proposed.list\n' - '{sudo} apt-get update -q\n' - '{sudo} apt-get install -qy cloud-init' - ).format(sudo='sudo' if self.use_sudo else '') - self._install_new_cloud_init(remote_script) + 'tee /etc/apt/sources.list.d/proposed.list\n' + 'apt-get update -q\n' + 'apt-get install -qy cloud-init' + ) + self.execute(remote_script) - def install_ppa(self, repo): + def install_ppa(self): log.info('Installing PPA') remote_script = ( - '{sudo} add-apt-repository {repo} -y && ' - '{sudo} apt-get update -q && ' - '{sudo} apt-get install -qy cloud-init' - ).format(sudo='sudo' if self.use_sudo else '', repo=repo) - self._install_new_cloud_init(remote_script) + 'add-apt-repository {repo} -y && ' + 'apt-get update -q && ' + 'apt-get install -qy cloud-init' + ).format(repo=self.settings.CLOUD_INIT_SOURCE) + self.execute(remote_script) def install_deb(self): log.info('Installing deb package') @@ -122,9 +163,8 @@ class IntegrationInstance: self.push_file( local_path=integration_settings.CLOUD_INIT_SOURCE, remote_path=remote_path) - remote_script = '{sudo} dpkg -i {path}'.format( - sudo='sudo' if self.use_sudo else '', path=remote_path) - self._install_new_cloud_init(remote_script) + remote_script = 'dpkg -i {path}'.format(path=remote_path) + self.execute(remote_script) def __enter__(self): return self @@ -151,4 +191,4 @@ class IntegrationOciInstance(IntegrationInstance): class IntegrationLxdInstance(IntegrationInstance): - use_sudo = False + pass diff --git a/tests/integration_tests/integration_settings.py b/tests/integration_tests/integration_settings.py index a0609f7e..22b4fdda 100644 --- a/tests/integration_tests/integration_settings.py +++ b/tests/integration_tests/integration_settings.py @@ -1,15 +1,22 @@ # This file is part of cloud-init. See LICENSE file for license information. import os +from distutils.util import strtobool + ################################################################## # LAUNCH SETTINGS ################################################################## # Keep instance (mostly for debugging) when test is finished KEEP_INSTANCE = False +# Keep snapshot image (mostly for debugging) when test is finished +KEEP_IMAGE = False +# Run tests marked as unstable. Expect failures and dragons. +RUN_UNSTABLE = False # One of: # lxd_container +# lxd_vm # azure # ec2 # gce @@ -21,8 +28,11 @@ PLATFORM = 'lxd_container' INSTANCE_TYPE = None # Determines the base image to use or generate new images from. -# Can be the name of the OS if running a stock image, -# otherwise the id of the image being used if using a custom image +# +# This can be the name of an Ubuntu release, or in the format +# <image_id>[::<os>[::<release>]]. If given, os and release should describe +# the image specified by image_id. (Ubuntu releases are converted to this +# format internally; in this case, to "focal::ubuntu::focal".) OS_IMAGE = 'focal' # Populate if you want to use a pre-launched instance instead of @@ -55,23 +65,27 @@ EXISTING_INSTANCE_ID = None # A path to a valid package to be uploaded and installed CLOUD_INIT_SOURCE = 'NONE' +# Before an instance is torn down, we run `cloud-init collect-logs` +# and transfer them locally. These settings specify when to collect these +# logs and where to put them on the local filesystem +# One of: +# 'ALWAYS' +# 'ON_ERROR' +# 'NEVER' +COLLECT_LOGS = 'ON_ERROR' +LOCAL_LOG_PATH = '/tmp/cloud_init_test_logs' + ################################################################## -# GCE SPECIFIC SETTINGS +# SSH KEY SETTINGS ################################################################## -# Required for GCE -GCE_PROJECT = None -# You probably want to override these -GCE_REGION = 'us-central1' -GCE_ZONE = 'a' +# A path to the public SSH key to use for test runs. (Defaults to pycloudlib's +# default behaviour, using ~/.ssh/id_rsa.pub.) +PUBLIC_SSH_KEY = None -################################################################## -# OCI SPECIFIC SETTINGS -################################################################## -# Compartment-id found at -# https://console.us-phoenix-1.oraclecloud.com/a/identity/compartments -# Required for Oracle -OCI_COMPARTMENT_ID = None +# For clouds which use named keypairs for SSH connection, the name that is used +# for the keypair. (Defaults to pycloudlib's default behaviour.) +KEYPAIR_NAME = None ################################################################## # USER SETTINGS OVERRIDES @@ -91,6 +105,12 @@ except ImportError: # Perhaps a bit too hacky, but it works :) current_settings = [var for var in locals() if var.isupper()] for setting in current_settings: - globals()[setting] = os.getenv( + env_setting = os.getenv( 'CLOUD_INIT_{}'.format(setting), globals()[setting] ) + if isinstance(env_setting, str): + try: + env_setting = bool(strtobool(env_setting.strip())) + except ValueError: + pass + globals()[setting] = env_setting diff --git a/tests/integration_tests/log_utils.py b/tests/integration_tests/log_utils.py new file mode 100644 index 00000000..40baae7b --- /dev/null +++ b/tests/integration_tests/log_utils.py @@ -0,0 +1,11 @@ +def verify_ordered_items_in_text(to_verify: list, text: str): + """Assert all items in list appear in order in text. + + Examples: + verify_ordered_items_in_text(['a', '1'], 'ab1') # passes + verify_ordered_items_in_text(['1', 'a'], 'ab1') # raises AssertionError + """ + index = 0 + for item in to_verify: + index = text[index:].find(item) + assert index > -1, "Expected item not found: '{}'".format(item) diff --git a/tests/integration_tests/modules/test_apt_configure_sources_list.py b/tests/integration_tests/modules/test_apt_configure_sources_list.py index d2bcc61a..28cbe19f 100644 --- a/tests/integration_tests/modules/test_apt_configure_sources_list.py +++ b/tests/integration_tests/modules/test_apt_configure_sources_list.py @@ -40,6 +40,7 @@ EXPECTED_REGEXES = [ @pytest.mark.ci +@pytest.mark.ubuntu class TestAptConfigureSourcesList: @pytest.mark.user_data(USER_DATA) diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py new file mode 100644 index 00000000..89c01a9c --- /dev/null +++ b/tests/integration_tests/modules/test_ca_certs.py @@ -0,0 +1,91 @@ +"""Integration tests for cc_ca_certs. + +(This is ported from ``tests/cloud_tests//testcases/modules/ca_certs.yaml``.) + +TODO: +* Mark this as running on Debian and Alpine (once we have marks for that) +* Implement testing for the RHEL-specific paths +""" +import os.path + +import pytest + + +USER_DATA = """\ +#cloud-config +ca-certs: + remove-defaults: true + trusted: + - | + -----BEGIN CERTIFICATE----- + MIIGJzCCBA+gAwIBAgIBATANBgkqhkiG9w0BAQUFADCBsjELMAkGA1UEBhMCRlIx + DzANBgNVBAgMBkFsc2FjZTETMBEGA1UEBwwKU3RyYXNib3VyZzEYMBYGA1UECgwP + d3d3LmZyZWVsYW4ub3JnMRAwDgYDVQQLDAdmcmVlbGFuMS0wKwYDVQQDDCRGcmVl + bGFuIFNhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkxIjAgBgkqhkiG9w0BCQEW + E2NvbnRhY3RAZnJlZWxhbi5vcmcwHhcNMTIwNDI3MTAzMTE4WhcNMjIwNDI1MTAz + MTE4WjB+MQswCQYDVQQGEwJGUjEPMA0GA1UECAwGQWxzYWNlMRgwFgYDVQQKDA93 + d3cuZnJlZWxhbi5vcmcxEDAOBgNVBAsMB2ZyZWVsYW4xDjAMBgNVBAMMBWFsaWNl + MSIwIAYJKoZIhvcNAQkBFhNjb250YWN0QGZyZWVsYW4ub3JnMIICIjANBgkqhkiG + 9w0BAQEFAAOCAg8AMIICCgKCAgEA3W29+ID6194bH6ejLrIC4hb2Ugo8v6ZC+Mrc + k2dNYMNPjcOKABvxxEtBamnSaeU/IY7FC/giN622LEtV/3oDcrua0+yWuVafyxmZ + yTKUb4/GUgafRQPf/eiX9urWurtIK7XgNGFNUjYPq4dSJQPPhwCHE/LKAykWnZBX + RrX0Dq4XyApNku0IpjIjEXH+8ixE12wH8wt7DEvdO7T3N3CfUbaITl1qBX+Nm2Z6 + q4Ag/u5rl8NJfXg71ZmXA3XOj7zFvpyapRIZcPmkvZYn7SMCp8dXyXHPdpSiIWL2 + uB3KiO4JrUYvt2GzLBUThp+lNSZaZ/Q3yOaAAUkOx+1h08285Pi+P8lO+H2Xic4S + vMq1xtLg2bNoPC5KnbRfuFPuUD2/3dSiiragJ6uYDLOyWJDivKGt/72OVTEPAL9o + 6T2pGZrwbQuiFGrGTMZOvWMSpQtNl+tCCXlT4mWqJDRwuMGrI4DnnGzt3IKqNwS4 + Qyo9KqjMIPwnXZAmWPm3FOKe4sFwc5fpawKO01JZewDsYTDxVj+cwXwFxbE2yBiF + z2FAHwfopwaH35p3C6lkcgP2k/zgAlnBluzACUI+MKJ/G0gv/uAhj1OHJQ3L6kn1 + SpvQ41/ueBjlunExqQSYD7GtZ1Kg8uOcq2r+WISE3Qc9MpQFFkUVllmgWGwYDuN3 + Zsez95kCAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0EHxYdT3BlblNT + TCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYEFFlfyRO6G8y5qEFKikl5 + ajb2fT7XMB8GA1UdIwQYMBaAFCNsLT0+KV14uGw+quK7Lh5sh/JTMA0GCSqGSIb3 + DQEBBQUAA4ICAQAT5wJFPqervbja5+90iKxi1d0QVtVGB+z6aoAMuWK+qgi0vgvr + mu9ot2lvTSCSnRhjeiP0SIdqFMORmBtOCFk/kYDp9M/91b+vS+S9eAlxrNCB5VOf + PqxEPp/wv1rBcE4GBO/c6HcFon3F+oBYCsUQbZDKSSZxhDm3mj7pb67FNbZbJIzJ + 70HDsRe2O04oiTx+h6g6pW3cOQMgIAvFgKN5Ex727K4230B0NIdGkzuj4KSML0NM + slSAcXZ41OoSKNjy44BVEZv0ZdxTDrRM4EwJtNyggFzmtTuV02nkUj1bYYYC5f0L + ADr6s0XMyaNk8twlWYlYDZ5uKDpVRVBfiGcq0uJIzIvemhuTrofh8pBQQNkPRDFT + Rq1iTo1Ihhl3/Fl1kXk1WR3jTjNb4jHX7lIoXwpwp767HAPKGhjQ9cFbnHMEtkro + RlJYdtRq5mccDtwT0GFyoJLLBZdHHMHJz0F9H7FNk2tTQQMhK5MVYwg+LIaee586 + CQVqfbscp7evlgjLW98H+5zylRHAgoH2G79aHljNKMp9BOuq6SnEglEsiWGVtu2l + hnx8SB3sVJZHeer8f/UQQwqbAO+Kdy70NmbSaqaVtp8jOxLiidWkwSyRTsuU6D8i + DiH5uEqBXExjrj0FslxcVKdVj5glVcSmkLwZKbEU1OKwleT/iXFhvooWhQ== + -----END CERTIFICATE----- +""" + + +@pytest.mark.ubuntu +@pytest.mark.user_data(USER_DATA) +class TestCaCerts: + def test_certs_updated(self, class_client): + """Test that /etc/ssl/certs is updated as we expect.""" + root = "/etc/ssl/certs" + filenames = class_client.execute(["ls", "-1", root]).splitlines() + unlinked_files = [] + links = {} + for filename in filenames: + full_path = os.path.join(root, filename) + symlink_target = class_client.execute(["readlink", full_path]) + is_symlink = symlink_target.ok + if is_symlink: + links[filename] = symlink_target + else: + unlinked_files.append(filename) + + assert ["ca-certificates.crt"] == unlinked_files + assert "cloud-init-ca-certs.pem" == links["a535c1f3.0"] + assert ( + "/usr/share/ca-certificates/cloud-init-ca-certs.crt" + == links["cloud-init-ca-certs.pem"] + ) + + def test_cert_installed(self, class_client): + """Test that our specified cert has been installed""" + checksum = class_client.execute( + "sha256sum /etc/ssl/certs/ca-certificates.crt" + ) + assert ( + "78e875f18c73c1aab9167ae0bd323391e52222cc2dbcda42d129537219300062" + in checksum + ) diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py new file mode 100644 index 00000000..3f41b34d --- /dev/null +++ b/tests/integration_tests/modules/test_cli.py @@ -0,0 +1,45 @@ +"""Integration tests for CLI functionality + +These would be for behavior manually invoked by user from the command line +""" + +import pytest + +from tests.integration_tests.instances import IntegrationInstance + + +VALID_USER_DATA = """\ +#cloud-config +runcmd: + - echo 'hi' > /var/tmp/test +""" + +INVALID_USER_DATA = """\ +runcmd: + - echo 'hi' > /var/tmp/test +""" + + +@pytest.mark.sru_2020_11 +@pytest.mark.user_data(VALID_USER_DATA) +def test_valid_userdata(client: IntegrationInstance): + """Test `cloud-init devel schema` with valid userdata. + + PR #575 + """ + result = client.execute('cloud-init devel schema --system') + assert result.ok + assert 'Valid cloud-config: system userdata' == result.stdout.strip() + + +@pytest.mark.sru_2020_11 +@pytest.mark.user_data(INVALID_USER_DATA) +def test_invalid_userdata(client: IntegrationInstance): + """Test `cloud-init devel schema` with invalid userdata. + + PR #575 + """ + result = client.execute('cloud-init devel schema --system') + assert not result.ok + assert 'Cloud config schema errors' in result.stderr + assert 'needs to begin with "#cloud-config"' in result.stderr diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py new file mode 100644 index 00000000..cbf11179 --- /dev/null +++ b/tests/integration_tests/modules/test_lxd_bridge.py @@ -0,0 +1,48 @@ +"""Integration tests for LXD bridge creation. + +(This is ported from +``tests/cloud_tests/testcases/modules/lxd_bridge.yaml``.) +""" +import pytest +import yaml + + +USER_DATA = """\ +#cloud-config +lxd: + init: + storage_backend: dir + bridge: + mode: new + name: lxdbr0 + ipv4_address: 10.100.100.1 + ipv4_netmask: 24 + ipv4_dhcp_first: 10.100.100.100 + ipv4_dhcp_last: 10.100.100.200 + ipv4_nat: true + domain: lxd +""" + + +@pytest.mark.no_container +@pytest.mark.user_data(USER_DATA) +class TestLxdBridge: + + @pytest.mark.parametrize("binary_name", ["lxc", "lxd"]) + def test_binaries_installed(self, class_client, binary_name): + """Check that the expected LXD binaries are installed""" + assert class_client.execute(["which", binary_name]).ok + + @pytest.mark.not_xenial + @pytest.mark.sru_2020_11 + def test_bridge(self, class_client): + """Check that the given bridge is configured""" + cloud_init_log = class_client.read_from_file("/var/log/cloud-init.log") + assert "WARN" not in cloud_init_log + + # The bridge should exist + assert class_client.execute("ip addr show lxdbr0") + + raw_network_config = class_client.execute("lxc network show lxdbr0") + network_config = yaml.safe_load(raw_network_config) + assert "10.100.100.1/24" == network_config["config"]["ipv4.address"] diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py index 8a38ad84..28d741bc 100644 --- a/tests/integration_tests/modules/test_package_update_upgrade_install.py +++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py @@ -26,6 +26,7 @@ package_upgrade: true """ +@pytest.mark.ubuntu @pytest.mark.user_data(USER_DATA) class TestPackageUpdateUpgradeInstall: diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py new file mode 100644 index 00000000..32dfc86d --- /dev/null +++ b/tests/integration_tests/modules/test_power_state_change.py @@ -0,0 +1,90 @@ +"""Integration test of the cc_power_state_change module. + +Test that the power state config options work as expected. +""" + +import time + +import pytest + +from tests.integration_tests.clouds import IntegrationCloud +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.log_utils import verify_ordered_items_in_text + +USER_DATA = """\ +#cloud-config +power_state: + delay: {delay} + mode: {mode} + message: msg + timeout: {timeout} + condition: {condition} +""" + + +def _detect_reboot(instance: IntegrationInstance): + # We'll wait for instance up here, but we don't know if we're + # detecting the first boot or second boot, so we also check + # the logs to ensure we've booted twice. If the logs show we've + # only booted once, wait until we've booted twice + instance.instance.wait() + for _ in range(600): + try: + log = instance.read_from_file('/var/log/cloud-init.log') + boot_count = log.count("running 'init-local'") + if boot_count == 1: + instance.instance.wait() + elif boot_count > 1: + break + except Exception: + pass + time.sleep(1) + else: + raise Exception('Could not detect reboot') + + +def _can_connect(instance): + return instance.execute('true').ok + + +# This test is marked unstable because even though it should be able to +# run anywhere, I can only get it to run in an lxd container, and even then +# occasionally some timing issues will crop up. +@pytest.mark.unstable +@pytest.mark.sru_2020_11 +@pytest.mark.ubuntu +@pytest.mark.lxd_container +class TestPowerChange: + @pytest.mark.parametrize('mode,delay,timeout,expected', [ + ('poweroff', 'now', '10', 'will execute: shutdown -P now msg'), + ('reboot', 'now', '0', 'will execute: shutdown -r now msg'), + ('halt', '+1', '0', 'will execute: shutdown -H +1 msg'), + ]) + def test_poweroff(self, session_cloud: IntegrationCloud, + mode, delay, timeout, expected): + with session_cloud.launch( + user_data=USER_DATA.format( + delay=delay, mode=mode, timeout=timeout, condition='true'), + wait=False + ) as instance: + if mode == 'reboot': + _detect_reboot(instance) + else: + instance.instance.wait_for_stop() + instance.instance.start(wait=True) + log = instance.read_from_file('/var/log/cloud-init.log') + assert _can_connect(instance) + lines_to_check = [ + 'Running module power-state-change', + expected, + "running 'init-local'", + 'config-power-state-change already ran', + ] + verify_ordered_items_in_text(lines_to_check, log) + + @pytest.mark.user_data(USER_DATA.format(delay='0', mode='poweroff', + timeout='0', condition='false')) + def test_poweroff_false_condition(self, client: IntegrationInstance): + log = client.read_from_file('/var/log/cloud-init.log') + assert _can_connect(client) + assert 'Condition was false. Will not perform state change' in log diff --git a/tests/integration_tests/modules/test_seed_random_data.py b/tests/integration_tests/modules/test_seed_random_data.py index b365fa98..f6a67c19 100644 --- a/tests/integration_tests/modules/test_seed_random_data.py +++ b/tests/integration_tests/modules/test_seed_random_data.py @@ -25,4 +25,4 @@ class TestSeedRandomData: @pytest.mark.user_data(USER_DATA) def test_seed_random_data(self, client): seed_output = client.read_from_file("/root/seed") - assert seed_output.strip() == "MYUb34023nD:LFDK10913jk;dfnk:Df" + assert seed_output.startswith("MYUb34023nD:LFDK10913jk;dfnk:Df") diff --git a/tests/integration_tests/modules/test_snap.py b/tests/integration_tests/modules/test_snap.py index b626f6b0..481edbaa 100644 --- a/tests/integration_tests/modules/test_snap.py +++ b/tests/integration_tests/modules/test_snap.py @@ -20,6 +20,7 @@ snap: @pytest.mark.ci +@pytest.mark.ubuntu class TestSnap: @pytest.mark.user_data(USER_DATA) diff --git a/tests/integration_tests/modules/test_ssh_import_id.py b/tests/integration_tests/modules/test_ssh_import_id.py index 45d37d6c..3db573b5 100644 --- a/tests/integration_tests/modules/test_ssh_import_id.py +++ b/tests/integration_tests/modules/test_ssh_import_id.py @@ -3,6 +3,10 @@ This test specifies ssh keys to be imported by the ``ssh_import_id`` module and then checks that if the ssh keys were successfully imported. +TODO: +* This test assumes that SSH keys will be imported into the /home/ubuntu; this + will need modification to run on other OSes. + (This is ported from ``tests/cloud_tests/testcases/modules/ssh_import_id.yaml``.)""" @@ -18,6 +22,7 @@ ssh_import_id: @pytest.mark.ci +@pytest.mark.ubuntu class TestSshImportId: @pytest.mark.user_data(USER_DATA) diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py index 27d193c1..6aae96ae 100644 --- a/tests/integration_tests/modules/test_ssh_keys_provided.py +++ b/tests/integration_tests/modules/test_ssh_keys_provided.py @@ -83,66 +83,78 @@ ssh_keys: @pytest.mark.user_data(USER_DATA) class TestSshKeysProvided: - def test_ssh_dsa_keys_provided(self, class_client): - """Test dsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key.pub") - assert ( - "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" - "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM") in out - - """Test dsa private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key") - assert ( - "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" - "hOVAfzZ6+jklP") in out - - def test_ssh_rsa_keys_provided(self, class_client): - """Test rsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key.pub") - assert ( - "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" - "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4") in out - - """Test rsa private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key") - assert ( - "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" - "RQvLZpMRdywBm") in out - - def test_ssh_rsa_certificate_provided(self, class_client): - """Test rsa certificate was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key-cert.pub") - assert ( - "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" - "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD") in out - - def test_ssh_certificate_updated_sshd_config(self, class_client): - """Test ssh certificate was added to /etc/ssh/sshd_config.""" - out = class_client.read_from_file("/etc/ssh/sshd_config").strip() - assert "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub" in out - - def test_ssh_ecdsa_keys_provided(self, class_client): - """Test ecdsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key.pub") - assert ( - "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" - "BBFsS5Tvky/IC/dXhE/afxxU") in out - - """Test ecdsa private key generated.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key") - assert ( - "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" - "5mpZqxgX4vcgb") in out - - def test_ssh_ed25519_keys_provided(self, class_client): - """Test ed25519 public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key.pub") - assert ( - "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" - "G15dqjQ2XkNVOEnb5") in out - - """Test ed25519 private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key") - assert ( - "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" - "OhteXao0Nl5DVThJ2+Q") in out + @pytest.mark.parametrize( + "config_path,expected_out", + ( + ( + "/etc/ssh/ssh_host_dsa_key.pub", + ( + "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" + "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM" + ), + ), + ( + "/etc/ssh/ssh_host_dsa_key", + ( + "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" + "hOVAfzZ6+jklP" + ), + ), + ( + "/etc/ssh/ssh_host_rsa_key.pub", + ( + "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" + "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4" + ), + ), + ( + "/etc/ssh/ssh_host_rsa_key", + ( + "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" + "RQvLZpMRdywBm" + ), + ), + ( + "/etc/ssh/ssh_host_rsa_key-cert.pub", + ( + "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" + "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD" + ), + ), + ( + "/etc/ssh/sshd_config", + "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub", + ), + ( + "/etc/ssh/ssh_host_ecdsa_key.pub", + ( + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" + "BBFsS5Tvky/IC/dXhE/afxxU" + ), + ), + ( + "/etc/ssh/ssh_host_ecdsa_key", + ( + "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" + "5mpZqxgX4vcgb" + ), + ), + ( + "/etc/ssh/ssh_host_ed25519_key.pub", + ( + "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" + "G15dqjQ2XkNVOEnb5" + ), + ), + ( + "/etc/ssh/ssh_host_ed25519_key", + ( + "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" + "OhteXao0Nl5DVThJ2+Q" + ), + ), + ) + ) + def test_ssh_provided_keys(self, config_path, expected_out, class_client): + out = class_client.read_from_file(config_path).strip() + assert expected_out in out diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py index 6a51f5a6..ee08d87b 100644 --- a/tests/integration_tests/modules/test_users_groups.py +++ b/tests/integration_tests/modules/test_users_groups.py @@ -2,6 +2,10 @@ This test specifies a number of users and groups via user-data, and confirms that they have been configured correctly in the system under test. + +TODO: +* This test assumes that the "ubuntu" user will be created when "default" is + specified; this will need modification to run on other OSes. """ import re @@ -41,6 +45,7 @@ AHWYPYb2FT.lbioDm2RrkJPb9BZMN1O/ @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestUsersGroups: + @pytest.mark.ubuntu @pytest.mark.parametrize( "getent_args,regex", [ diff --git a/tests/integration_tests/test_upgrade.py b/tests/integration_tests/test_upgrade.py new file mode 100644 index 00000000..233a574b --- /dev/null +++ b/tests/integration_tests/test_upgrade.py @@ -0,0 +1,98 @@ +import logging +import pytest +import time +from pathlib import Path + +from tests.integration_tests.clouds import ImageSpecification, IntegrationCloud +from tests.integration_tests.conftest import ( + get_validated_source, + session_start_time, +) + +log = logging.getLogger('integration_testing') + +USER_DATA = """\ +#cloud-config +hostname: SRU-worked +""" + + +def _output_to_compare(instance, file_path, netcfg_path): + commands = [ + 'hostname', + 'dpkg-query --show cloud-init', + 'cat /run/cloud-init/result.json', + # 'cloud-init init' helps us understand if our pickling upgrade paths + # have broken across re-constitution of a cached datasource. Some + # platforms invalidate their datasource cache on reboot, so we run + # it here to ensure we get a dirty run. + 'cloud-init init', + 'grep Trace /var/log/cloud-init.log', + 'cloud-id', + 'cat {}'.format(netcfg_path), + 'systemd-analyze', + 'systemd-analyze blame', + 'cloud-init analyze show', + 'cloud-init analyze blame', + ] + with file_path.open('w') as f: + for command in commands: + f.write('===== {} ====='.format(command) + '\n') + f.write(instance.execute(command) + '\n') + + +def _restart(instance): + # work around pad.lv/1908287 + instance.restart() + if not instance.execute('cloud-init status --wait --long').ok: + for _ in range(10): + time.sleep(5) + result = instance.execute('cloud-init status --wait --long') + if result.ok: + return + raise Exception("Cloud-init didn't finish starting up") + + +@pytest.mark.sru_2020_11 +def test_upgrade(session_cloud: IntegrationCloud): + source = get_validated_source(session_cloud) + if not source.installs_new_version(): + pytest.skip("Install method '{}' not supported for this test".format( + source + )) + return # type checking doesn't understand that skip raises + + launch_kwargs = { + 'image_id': session_cloud._get_initial_image(), + } + + image = ImageSpecification.from_os_image() + + # Get the paths to write test logs + output_dir = Path(session_cloud.settings.LOCAL_LOG_PATH) + output_dir.mkdir(parents=True, exist_ok=True) + base_filename = 'test_upgrade_{platform}_{os}_{{stage}}_{time}.log'.format( + platform=session_cloud.settings.PLATFORM, + os=image.release, + time=session_start_time, + ) + before_path = output_dir / base_filename.format(stage='before') + after_path = output_dir / base_filename.format(stage='after') + + # Get the network cfg file + netcfg_path = '/dev/null' + if image.os == 'ubuntu': + netcfg_path = '/etc/netplan/50-cloud-init.yaml' + if image.release == 'xenial': + netcfg_path = '/etc/network/interfaces.d/50-cloud-init.cfg' + + with session_cloud.launch( + launch_kwargs=launch_kwargs, user_data=USER_DATA, wait=True, + ) as instance: + _output_to_compare(instance, before_path, netcfg_path) + instance.install_new_cloud_init(source, take_snapshot=False) + instance.execute('hostname something-else') + _restart(instance) + _output_to_compare(instance, after_path, netcfg_path) + + log.info('Wrote upgrade test logs to %s and %s', before_path, after_path) diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index e363c1f9..dc615309 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -159,6 +159,22 @@ SECONDARY_INTERFACE = { } } +SECONDARY_INTERFACE_NO_IP = { + "macAddress": "220D3A047598", + "ipv6": { + "ipAddress": [] + }, + "ipv4": { + "subnet": [ + { + "prefix": "24", + "address": "10.0.1.0" + } + ], + "ipAddress": [] + } +} + IMDS_NETWORK_METADATA = { "interface": [ { @@ -1139,6 +1155,30 @@ scbus-1 on xpt0 bus 0 dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) + @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', + return_value=None) + def test_network_config_set_from_imds_for_secondary_nic_no_ip( + self, m_driver): + """If an IP address is empty then there should no config for it.""" + sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + odata = {} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': sys_cfg} + expected_network_config = { + 'ethernets': { + 'eth0': {'set-name': 'eth0', + 'match': {'macaddress': '00:0d:3a:04:75:98'}, + 'dhcp6': False, + 'dhcp4': True, + 'dhcp4-overrides': {'route-metric': 100}}}, + 'version': 2} + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data['network']['interface'].append(SECONDARY_INTERFACE_NO_IP) + self.m_get_metadata_from_imds.return_value = imds_data + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(expected_network_config, dsrc.network_config) + def test_availability_zone_set_from_imds(self): """Datasource.availability returns IMDS platformFaultDomain.""" sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} @@ -1757,7 +1797,9 @@ scbus-1 on xpt0 bus 0 dsrc.get_data() dsrc.setup(True) ssh_keys = dsrc.get_public_ssh_keys() - self.assertEqual(ssh_keys, ['key1']) + # Temporarily alter this test so that SSH public keys + # from IMDS are *not* going to be in use to fix a regression. + self.assertEqual(ssh_keys, []) self.assertEqual(m_parse_certificates.call_count, 0) @mock.patch(MOCKPATH + 'get_metadata_from_imds') diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py index 16773de5..dce01f5d 100644 --- a/tests/unittests/test_datasource/test_ovf.py +++ b/tests/unittests/test_datasource/test_ovf.py @@ -17,6 +17,7 @@ from cloudinit.helpers import Paths from cloudinit.sources import DataSourceOVF as dsovf from cloudinit.sources.helpers.vmware.imc.config_custom_script import ( CustomScriptNotFound) +from cloudinit.safeyaml import YAMLError MPATH = 'cloudinit.sources.DataSourceOVF.' @@ -138,16 +139,29 @@ class TestDatasourceOVF(CiTestCase): 'DEBUG: No system-product-name found', self.logs.getvalue()) def test_get_data_no_vmware_customization_disabled(self): - """When vmware customization is disabled via sys_cfg log a message.""" + """When cloud-init workflow for vmware is disabled via sys_cfg and + no meta data provided, log a message. + """ paths = Paths({'cloud_dir': self.tdir}) ds = self.datasource( sys_cfg={'disable_vmware_customization': True}, distro={}, paths=paths) + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CUSTOM-SCRIPT] + SCRIPT-NAME = test-script + [MISC] + MARKER-ID = 12345345 + """) + util.write_file(conf_file, conf_content) retcode = wrap_and_call( 'cloudinit.sources.DataSourceOVF', {'dmi.read_dmi_data': 'vmware', 'transport_iso9660': NOT_FOUND, - 'transport_vmware_guestinfo': NOT_FOUND}, + 'transport_vmware_guestinfo': NOT_FOUND, + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file}, ds.get_data) self.assertFalse(retcode, 'Expected False return from ds.get_data') self.assertIn( @@ -344,6 +358,279 @@ class TestDatasourceOVF(CiTestCase): 'vmware (%s/seed/ovf-env.xml)' % self.tdir, ds.subplatform) + def test_get_data_cloudinit_metadata_json(self): + """Test metadata can be loaded to cloud-init metadata and network. + The metadata format is json. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': True}, distro={}, + paths=paths) + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + """) + util.write_file(conf_file, conf_content) + # Prepare the meta data file + metadata_file = self.tmp_path('test-meta', self.tdir) + metadata_content = dedent("""\ + { + "instance-id": "cloud-vm", + "local-hostname": "my-host.domain.com", + "network": { + "version": 2, + "ethernets": { + "eths": { + "match": { + "name": "ens*" + }, + "dhcp4": true + } + } + } + } + """) + util.write_file(metadata_file, metadata_content) + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + result = wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'collect_imc_file_paths': [self.tdir + '/test-meta', '', ''], + 'get_nics_to_enable': ''}, + ds._get_data) + + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata['instance-id']) + self.assertEqual("my-host.domain.com", ds.metadata['local-hostname']) + self.assertEqual(2, ds.network_config['version']) + self.assertTrue(ds.network_config['ethernets']['eths']['dhcp4']) + + def test_get_data_cloudinit_metadata_yaml(self): + """Test metadata can be loaded to cloud-init metadata and network. + The metadata format is yaml. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': True}, distro={}, + paths=paths) + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + """) + util.write_file(conf_file, conf_content) + # Prepare the meta data file + metadata_file = self.tmp_path('test-meta', self.tdir) + metadata_content = dedent("""\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """) + util.write_file(metadata_file, metadata_content) + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + result = wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'collect_imc_file_paths': [self.tdir + '/test-meta', '', ''], + 'get_nics_to_enable': ''}, + ds._get_data) + + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata['instance-id']) + self.assertEqual("my-host.domain.com", ds.metadata['local-hostname']) + self.assertEqual(2, ds.network_config['version']) + self.assertTrue(ds.network_config['ethernets']['nics']['dhcp4']) + + def test_get_data_cloudinit_metadata_not_valid(self): + """Test metadata is not JSON or YAML format. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': True}, distro={}, + paths=paths) + + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + """) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path('test-meta', self.tdir) + metadata_content = "[This is not json or yaml format]a=b" + util.write_file(metadata_file, metadata_content) + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + with self.assertRaises(YAMLError) as context: + wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'collect_imc_file_paths': [ + self.tdir + '/test-meta', '', '' + ], + 'get_nics_to_enable': ''}, + ds.get_data) + + self.assertIn("expected '<document start>', but found '<scalar>'", + str(context.exception)) + + def test_get_data_cloudinit_metadata_not_found(self): + """Test metadata file can't be found. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': True}, distro={}, + paths=paths) + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + """) + util.write_file(conf_file, conf_content) + # Don't prepare the meta data file + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + with self.assertRaises(FileNotFoundError) as context: + wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'get_nics_to_enable': ''}, + ds.get_data) + + self.assertIn('is not found', str(context.exception)) + + def test_get_data_cloudinit_userdata(self): + """Test user data can be loaded to cloud-init user data. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': False}, distro={}, + paths=paths) + + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + USERDATA = test-user + """) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path('test-meta', self.tdir) + metadata_content = dedent("""\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """) + util.write_file(metadata_file, metadata_content) + + # Prepare the user data file + userdata_file = self.tmp_path('test-user', self.tdir) + userdata_content = "This is the user data" + util.write_file(userdata_file, userdata_content) + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + result = wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'collect_imc_file_paths': [self.tdir + '/test-meta', + self.tdir + '/test-user', ''], + 'get_nics_to_enable': ''}, + ds._get_data) + + self.assertTrue(result) + self.assertEqual("cloud-vm", ds.metadata['instance-id']) + self.assertEqual(userdata_content, ds.userdata_raw) + + def test_get_data_cloudinit_userdata_not_found(self): + """Test userdata file can't be found. + """ + paths = Paths({'cloud_dir': self.tdir}) + ds = self.datasource( + sys_cfg={'disable_vmware_customization': True}, distro={}, + paths=paths) + + # Prepare the conf file + conf_file = self.tmp_path('test-cust', self.tdir) + conf_content = dedent("""\ + [CLOUDINIT] + METADATA = test-meta + USERDATA = test-user + """) + util.write_file(conf_file, conf_content) + + # Prepare the meta data file + metadata_file = self.tmp_path('test-meta', self.tdir) + metadata_content = dedent("""\ + instance-id: cloud-vm + local-hostname: my-host.domain.com + network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes + """) + util.write_file(metadata_file, metadata_content) + + # Don't prepare the user data file + + with mock.patch(MPATH + 'set_customization_status', + return_value=('msg', b'')): + with self.assertRaises(FileNotFoundError) as context: + wrap_and_call( + 'cloudinit.sources.DataSourceOVF', + {'dmi.read_dmi_data': 'vmware', + 'util.del_dir': True, + 'search_file': self.tdir, + 'wait_for_imc_cfg_file': conf_file, + 'get_nics_to_enable': ''}, + ds.get_data) + + self.assertIn('is not found', str(context.exception)) + class TestTransportIso9660(CiTestCase): diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index e74a0a08..6e3831ed 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -47,12 +47,20 @@ class TestConfig(TestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "ca-certs" - distro = self._fetch_distro('ubuntu') self.paths = None - self.cloud = cloud.Cloud(None, self.paths, None, distro, None) self.log = logging.getLogger("TestNoConfig") self.args = [] + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) + + def _get_cloud(self, kind): + distro = self._fetch_distro(kind) + return cloud.Cloud(None, self.paths, None, distro, None) + + def _mock_init(self): self.mocks = ExitStack() self.addCleanup(self.mocks.close) @@ -64,11 +72,6 @@ class TestConfig(TestCase): self.mock_remove = self.mocks.enter_context( mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) - def _fetch_distro(self, kind): - cls = distros.fetch(kind) - paths = helpers.Paths({}) - return cls(kind, {}, paths) - def test_no_trusted_list(self): """ Test that no certificates are written if the 'trusted' key is not @@ -76,71 +79,95 @@ class TestConfig(TestCase): """ config = {"ca-certs": {}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.mock_add.assert_called_once_with(conf, ['CERT1']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) def test_multiple_trusted(self): """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1', 'CERT2']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.mock_add.assert_called_once_with(conf, ['CERT1', 'CERT2']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) def test_remove_default_ca_certs(self): """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 1) def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.assertEqual(self.mock_add.call_count, 0) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 0) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + for distro_name in cc_ca_certs.distros: + self._mock_init() + cloud = self._get_cloud(distro_name) + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + cc_ca_certs.handle(self.name, config, cloud, self.log, self.args) - self.mock_add.assert_called_once_with(['CERT1']) - self.assertEqual(self.mock_update.call_count, 1) - self.assertEqual(self.mock_remove.call_count, 1) + self.mock_add.assert_called_once_with(conf, ['CERT1']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 1) class TestAddCaCerts(TestCase): @@ -152,12 +179,20 @@ class TestAddCaCerts(TestCase): self.paths = helpers.Paths({ 'cloud_dir': tmpdir, }) + self.add_patch("cloudinit.config.cc_ca_certs.os.stat", "m_stat") + + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" - with mock.patch.object(util, 'write_file') as mockobj: - cc_ca_certs.add_ca_certs([]) - self.assertEqual(mockobj.call_count, 0) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + with mock.patch.object(util, 'write_file') as mockobj: + cc_ca_certs.add_ca_certs(conf, []) + self.assertEqual(mockobj.call_count, 0) def test_single_cert_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -167,20 +202,28 @@ class TestAddCaCerts(TestCase): ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + self.m_stat.return_value.st_size = 1 - cc_ca_certs.add_ca_certs([cert]) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) + + cc_ca_certs.add_ca_certs(conf, [cert]) + + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_full_path'], + cert, mode=0o644)]) + if conf['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_config'], + expected, omode="wb")]) + mock_load.assert_called_once_with(conf['ca_cert_config']) def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -189,24 +232,31 @@ class TestAddCaCerts(TestCase): ca_certs_content = "line1\nline2\nline3" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + self.m_stat.return_value.st_size = 1 + + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) - cc_ca_certs.add_ca_certs([cert]) + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode="wb")]) + cc_ca_certs.add_ca_certs(conf, [cert]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_full_path'], + cert, mode=0o644)]) + if conf['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + conf['ca_cert_filename']), + omode="wb")]) + + mock_load.assert_called_once_with(conf['ca_cert_config']) def test_single_cert_to_empty_existing_ca_file(self): """Test adding a single certificate to the trusted CAs @@ -215,20 +265,22 @@ class TestAddCaCerts(TestCase): expected = "cloud-init-ca-certs.crt\n" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file', autospec=True)) - mock_stat = mocks.enter_context( - mock.patch("cloudinit.config.cc_ca_certs.os.stat") - ) - mock_stat.return_value.st_size = 0 + self.m_stat.return_value.st_size = 0 - cc_ca_certs.add_ca_certs([cert]) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + with mock.patch.object(util, 'write_file', + autospec=True) as m_write: - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) + cc_ca_certs.add_ca_certs(conf, [cert]) + + m_write.assert_has_calls([ + mock.call(conf['ca_cert_full_path'], + cert, mode=0o644)]) + if conf['ca_cert_config'] is not None: + m_write.assert_has_calls([ + mock.call(conf['ca_cert_config'], + expected, omode="wb")]) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" @@ -236,32 +288,41 @@ class TestAddCaCerts(TestCase): expected_cert_file = "\n".join(certs) ca_certs_content = "line1\nline2\nline3" - with ExitStack() as mocks: - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_load = mocks.enter_context( - mock.patch.object(util, 'load_file', - return_value=ca_certs_content)) + self.m_stat.return_value.st_size = 1 + + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - cc_ca_certs.add_ca_certs(certs) + cc_ca_certs.add_ca_certs(conf, certs) - mock_write.assert_has_calls([ - mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - expected_cert_file, mode=0o644), - mock.call("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, - "cloud-init-ca-certs.crt"), - omode='wb')]) + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_full_path'], + expected_cert_file, mode=0o644)]) + if conf['ca_cert_config'] is not None: + mock_write.assert_has_calls([ + mock.call(conf['ca_cert_config'], + "%s\n%s\n" % (ca_certs_content, + conf['ca_cert_filename']), + omode='wb')]) - mock_load.assert_called_once_with("/etc/ca-certificates.conf") + mock_load.assert_called_once_with(conf['ca_cert_config']) class TestUpdateCaCerts(unittest.TestCase): def test_commands(self): - with mock.patch.object(subp, 'subp') as mockobj: - cc_ca_certs.update_ca_certs() - mockobj.assert_called_once_with( - ["update-ca-certificates"], capture=False) + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + with mock.patch.object(subp, 'subp') as mockobj: + cc_ca_certs.update_ca_certs(conf) + mockobj.assert_called_once_with( + conf['ca_cert_update_cmd'], capture=False) class TestRemoveDefaultCaCerts(TestCase): @@ -275,24 +336,31 @@ class TestRemoveDefaultCaCerts(TestCase): }) def test_commands(self): - with ExitStack() as mocks: - mock_delete = mocks.enter_context( - mock.patch.object(util, 'delete_dir_contents')) - mock_write = mocks.enter_context( - mock.patch.object(util, 'write_file')) - mock_subp = mocks.enter_context(mock.patch.object(subp, 'subp')) - - cc_ca_certs.remove_default_ca_certs('ubuntu') - - mock_delete.assert_has_calls([ - mock.call("/usr/share/ca-certificates/"), - mock.call("/etc/ssl/certs/")]) - - mock_write.assert_called_once_with( - "/etc/ca-certificates.conf", "", mode=0o644) - - mock_subp.assert_called_once_with( - ('debconf-set-selections', '-'), - "ca-certificates ca-certificates/trust_new_crts select no") + for distro_name in cc_ca_certs.distros: + conf = cc_ca_certs._distro_ca_certs_configs(distro_name) + + with ExitStack() as mocks: + mock_delete = mocks.enter_context( + mock.patch.object(util, 'delete_dir_contents')) + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_subp = mocks.enter_context( + mock.patch.object(subp, 'subp')) + + cc_ca_certs.remove_default_ca_certs(distro_name, conf) + + mock_delete.assert_has_calls([ + mock.call(conf['ca_cert_path']), + mock.call(conf['ca_cert_system_path'])]) + + if conf['ca_cert_config'] is not None: + mock_write.assert_called_once_with( + conf['ca_cert_config'], "", mode=0o644) + + if distro_name in ['debian', 'ubuntu']: + mock_subp.assert_called_once_with( + ('debconf-set-selections', '-'), + "ca-certificates \ +ca-certificates/trust_new_crts select no") # vi: ts=4 expandtab diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 70453683..38d934d4 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -1365,10 +1365,11 @@ NETWORK_CONFIGS = { }, 'expected_sysconfig_rhel': { 'ifcfg-iface0': textwrap.dedent("""\ - BOOTPROTO=none + BOOTPROTO=dhcp DEVICE=iface0 DHCPV6C=yes IPV6INIT=yes + IPV6_AUTOCONF=no IPV6_FORCE_ACCEPT_RA=yes DEVICE=iface0 NM_CONTROLLED=no @@ -4819,6 +4820,9 @@ class TestEniRoundTrip(CiTestCase): {'type': 'route', 'id': 6, 'metric': 1, 'destination': '10.0.200.0/16', 'gateway': '172.23.31.1'}, + {'type': 'route', 'id': 7, + 'metric': 1, 'destination': '10.0.0.100/32', + 'gateway': '172.23.31.1'}, ] files = self._render_and_read( @@ -4842,6 +4846,10 @@ class TestEniRoundTrip(CiTestCase): '172.23.31.1 metric 1 || true'), ('pre-down route del -net 10.0.200.0/16 gw ' '172.23.31.1 metric 1 || true'), + ('post-up route add -host 10.0.0.100/32 gw ' + '172.23.31.1 metric 1 || true'), + ('pre-down route del -host 10.0.0.100/32 gw ' + '172.23.31.1 metric 1 || true'), ] found = files['/etc/network/interfaces'].splitlines() diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py index 9c7d25fa..430cc69f 100644 --- a/tests/unittests/test_vmware_config_file.py +++ b/tests/unittests/test_vmware_config_file.py @@ -525,5 +525,21 @@ class TestVmwareNetConfig(CiTestCase): 'gateway': '10.20.87.253'}]}], nc.generate()) + def test_meta_data(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertIsNone(conf.meta_data_name) + cf._insertKey("CLOUDINIT|METADATA", "test-metadata") + conf = Config(cf) + self.assertEqual("test-metadata", conf.meta_data_name) + + def test_user_data(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + conf = Config(cf) + self.assertIsNone(conf.user_data_name) + cf._insertKey("CLOUDINIT|USERDATA", "test-userdata") + conf = Config(cf) + self.assertEqual("test-userdata", conf.user_data_name) + # vi: ts=4 expandtab |