summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules
diff options
context:
space:
mode:
Diffstat (limited to 'tests/integration_tests/modules')
-rw-r--r--tests/integration_tests/modules/test_apt.py354
-rw-r--r--tests/integration_tests/modules/test_apt_configure_sources_list.py51
-rw-r--r--tests/integration_tests/modules/test_ca_certs.py90
-rw-r--r--tests/integration_tests/modules/test_cli.py81
-rw-r--r--tests/integration_tests/modules/test_combined.py342
-rw-r--r--tests/integration_tests/modules/test_command_output.py21
-rw-r--r--tests/integration_tests/modules/test_disk_setup.py212
-rw-r--r--tests/integration_tests/modules/test_growpart.py68
-rw-r--r--tests/integration_tests/modules/test_hotplug.py112
-rw-r--r--tests/integration_tests/modules/test_jinja_templating.py33
-rw-r--r--tests/integration_tests/modules/test_keyboard.py17
-rw-r--r--tests/integration_tests/modules/test_keys_to_console.py113
-rw-r--r--tests/integration_tests/modules/test_lxd_bridge.py46
-rw-r--r--tests/integration_tests/modules/test_ntp_servers.py98
-rw-r--r--tests/integration_tests/modules/test_package_update_upgrade_install.py19
-rw-r--r--tests/integration_tests/modules/test_persistence.py32
-rw-r--r--tests/integration_tests/modules/test_power_state_change.py97
-rw-r--r--tests/integration_tests/modules/test_puppet.py39
-rw-r--r--tests/integration_tests/modules/test_runcmd.py25
-rw-r--r--tests/integration_tests/modules/test_seed_random_data.py28
-rw-r--r--tests/integration_tests/modules/test_set_hostname.py27
-rw-r--r--tests/integration_tests/modules/test_set_password.py57
-rw-r--r--tests/integration_tests/modules/test_snap.py29
-rw-r--r--tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py18
-rw-r--r--tests/integration_tests/modules/test_ssh_generate.py16
-rw-r--r--tests/integration_tests/modules/test_ssh_import_id.py29
-rw-r--r--tests/integration_tests/modules/test_ssh_keys_provided.py122
-rw-r--r--tests/integration_tests/modules/test_ssh_keysfile.py224
-rw-r--r--tests/integration_tests/modules/test_timezone.py25
-rw-r--r--tests/integration_tests/modules/test_user_events.py110
-rw-r--r--tests/integration_tests/modules/test_users_groups.py50
-rw-r--r--tests/integration_tests/modules/test_version_change.py76
-rw-r--r--tests/integration_tests/modules/test_write_files.py47
33 files changed, 2393 insertions, 315 deletions
diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py
new file mode 100644
index 00000000..adab46a8
--- /dev/null
+++ b/tests/integration_tests/modules/test_apt.py
@@ -0,0 +1,354 @@
+"""Series of integration tests covering apt functionality."""
+import re
+
+import pytest
+
+from cloudinit import gpg
+from cloudinit.config import cc_apt_configure
+from tests.integration_tests.clouds import ImageSpecification
+from tests.integration_tests.instances import IntegrationInstance
+
+USER_DATA = """\
+#cloud-config
+apt:
+ conf: |
+ APT {
+ Get {
+ Assume-Yes "true";
+ Fix-Broken "true";
+ }
+ }
+ primary:
+ - arches: [default]
+ uri: http://badarchive.ubuntu.com/ubuntu
+ security:
+ - arches: [default]
+ uri: http://badsecurity.ubuntu.com/ubuntu
+ sources_list: |
+ deb $MIRROR $RELEASE main restricted
+ deb-src $MIRROR $RELEASE main restricted
+ deb $PRIMARY $RELEASE universe restricted
+ deb-src $PRIMARY $RELEASE universe restricted
+ deb $SECURITY $RELEASE-security multiverse
+ deb-src $SECURITY $RELEASE-security multiverse
+ sources:
+ test_keyserver:
+ keyid: 110E21D8B0E2A1F0243AF6820856F197B892ACEA
+ keyserver: keyserver.ubuntu.com
+ source: "deb http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu $RELEASE main"
+ test_ppa:
+ keyid: 441614D8
+ keyserver: keyserver.ubuntu.com
+ source: "ppa:simplestreams-dev/trunk"
+ test_signed_by:
+ keyid: A2EB2DEC0BD7519B7B38BE38376A290EC8068B11
+ keyserver: keyserver.ubuntu.com
+ source: "deb [signed-by=$KEY_FILE] http://ppa.launchpad.net/juju/stable/ubuntu $RELEASE main"
+ test_bad_key:
+ key: ""
+ source: "deb $MIRROR $RELEASE main"
+ test_key:
+ source: "deb http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu $RELEASE main"
+ key: |
+ -----BEGIN PGP PUBLIC KEY BLOCK-----
+ Version: SKS 1.1.6
+ Comment: Hostname: keyserver.ubuntu.com
+
+ mQINBFbZRUIBEAC+A0PIKYBP9kLC4hQtRrffRS11uLo8/BdtmOdrlW0hpPHzCfKnjR3tvSEI
+ lqPHG1QrrjAXKZDnZMRz+h/px7lUztvytGzHPSJd5ARUzAyjyRezUhoJ3VSCxrPqx62avuWf
+ RfoJaIeHfDehL5/dTVkyiWxfVZ369ZX6JN2AgLsQTeybTQ75+2z0xPrrhnGmgh6g0qTYcAaq
+ M5ONOGiqeSBX/Smjh6ALy5XkhUiFGLsI7Yluf6XSICY/x7gd6RAfgSIQrUTNMoS1sqhT4aot
+ +xvOfQy8ySkfAK4NddXql6E/+ZqTmBY/Lr0YklFBy8jGT+UysfiIznPMIwbmgq5Li7BtDDtX
+ b8Uyi4edPpjtextezfXYn4NVIpPL5dPZS/FXh4HpzyH0pYCfrH4QDGA7i52AGmhpiOFjJMo6
+ N33sdjZHOH/2Vyp+QZaQnsdUAi1N4M6c33tQbpIScn1SY+El8z5JDA4PBzkw8HpLCi1gGoa6
+ V4kfbWqXXbGAJFkLkP/vc4+pY9axOlmCkJg7xCPwhI75y1cONgovhz+BEXOzolh5KZuGbGbj
+ xe0wva5DLBeIg7EQFf+99pOS7Syby3Xpm6ZbswEFV0cllK4jf/QMjtfInxobuMoI0GV0bE5l
+ WlRtPCK5FnbHwxi0wPNzB/5fwzJ77r6HgPrR0OkT0lWmbUyoOQARAQABtC1MYXVuY2hwYWQg
+ UFBBIGZvciBjbG91ZCBpbml0IGRldmVsb3BtZW50IHRlYW2JAjgEEwECACIFAlbZRUICGwMG
+ CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEAg9Bvvk0wTfHfcP/REK5N2s1JYc69qEa9ZN
+ o6oi+A7l6AYw+ZY88O5TJe7F9otv5VXCIKSUT0Vsepjgf0mtXAgf/sb2lsJn/jp7tzgov3YH
+ vSrkTkRydz8xcA87gwQKePuvTLxQpftF4flrBxgSueIn5O/tPrBOxLz7EVYBc78SKg9aj9L2
+ yUp+YuNevlwfZCTYeBb9r3FHaab2HcgkwqYch66+nKYfwiLuQ9NzXXm0Wn0JcEQ6pWvJscbj
+ C9BdawWovfvMK5/YLfI6Btm7F4mIpQBdhSOUp/YXKmdvHpmwxMCN2QhqYK49SM7qE9aUDbJL
+ arppSEBtlCLWhRBZYLTUna+BkuQ1bHz4St++XTR49Qd7vDERALpApDjB2dxPfMiBzCMwQQyq
+ uy13exU8o2ETLg+dZSLfDTzrBNsBFmXlw8WW17nTISYdKeGKL+QdlUjpzdwUMMzHhAO8SmMH
+ zjeSlDSRMXBJFAFSbCl7EwmMKa3yVX0zInT91fNllZ3iatAmtVdqVH/BFQfTIMH2ET7A8WzJ
+ ZzVSuMRhqoKdr5AMcHuJGPUoVkVJHQA+NNvEiXSysF3faL7jmKapmUwrhpYYX2H8pf+VMu2e
+ cLflKTI28dl+ZQ4Pl/aVsxrti/pzhdYy05Sn5ddtySyIkvo8L1cU5MWpbvSlFPkTstBUDLBf
+ pb0uBy+g0oxJQg15
+ =uy53
+ -----END PGP PUBLIC KEY BLOCK-----
+apt_pipelining: os
+""" # noqa: E501
+
+EXPECTED_REGEXES = [
+ r"deb http://badarchive.ubuntu.com/ubuntu [a-z]+ main restricted",
+ r"deb-src http://badarchive.ubuntu.com/ubuntu [a-z]+ main restricted",
+ r"deb http://badarchive.ubuntu.com/ubuntu [a-z]+ universe restricted",
+ r"deb-src http://badarchive.ubuntu.com/ubuntu [a-z]+ universe restricted",
+ r"deb http://badsecurity.ubuntu.com/ubuntu [a-z]+-security multiverse",
+ r"deb-src http://badsecurity.ubuntu.com/ubuntu [a-z]+-security multiverse",
+]
+
+TEST_KEYSERVER_KEY = "110E 21D8 B0E2 A1F0 243A F682 0856 F197 B892 ACEA"
+TEST_PPA_KEY = "3552 C902 B4DD F7BD 3842 1821 015D 28D7 4416 14D8"
+TEST_KEY = "1FF0 D853 5EF7 E719 E5C8 1B9C 083D 06FB E4D3 04DF"
+TEST_SIGNED_BY_KEY = "A2EB 2DEC 0BD7 519B 7B38 BE38 376A 290E C806 8B11"
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(USER_DATA)
+class TestApt:
+ def get_keys(self, class_client: IntegrationInstance):
+ """Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg
+ in human readable format. Mimics the output of apt-key finger
+ """
+ list_cmd = " ".join(gpg.GPG_LIST) + " "
+ keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS)
+ print(keys)
+ files = class_client.execute(
+ "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR
+ )
+ for file in files.split():
+ path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file
+ keys += class_client.execute(list_cmd + path) or ""
+ return keys
+
+ def test_sources_list(self, class_client: IntegrationInstance):
+ """Integration test for the apt module's `sources_list` functionality.
+
+ This test specifies a ``sources_list`` and then checks that (a) the
+ expected number of sources.list entries is present, and (b) that each
+ expected line appears in the file.
+
+ (This is ported from
+ `tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.)
+ """
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert 6 == len(sources_list.rstrip().split("\n"))
+
+ for expected_re in EXPECTED_REGEXES:
+ assert re.search(expected_re, sources_list) is not None
+
+ def test_apt_conf(self, class_client: IntegrationInstance):
+ """Test the apt conf functionality.
+
+ Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py
+ """
+ apt_config = class_client.read_from_file(
+ "/etc/apt/apt.conf.d/94cloud-init-config"
+ )
+ assert 'Assume-Yes "true";' in apt_config
+ assert 'Fix-Broken "true";' in apt_config
+
+ def test_ppa_source(self, class_client: IntegrationInstance):
+ """Test the apt ppa functionality.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_sources_ppa.py
+ """
+ release = ImageSpecification.from_os_image().release
+ ppa_path_contents = class_client.read_from_file(
+ "/etc/apt/sources.list.d/"
+ "simplestreams-dev-ubuntu-trunk-{}.list".format(release)
+ )
+
+ assert (
+ "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu"
+ in ppa_path_contents
+ )
+
+ assert TEST_PPA_KEY in self.get_keys(class_client)
+
+ def test_signed_by(self, class_client: IntegrationInstance):
+ """Test the apt signed-by functionality."""
+ release = ImageSpecification.from_os_image().release
+ source = (
+ "deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] "
+ "http://ppa.launchpad.net/juju/stable/ubuntu"
+ " {} main".format(release)
+ )
+ path_contents = class_client.read_from_file(
+ "/etc/apt/sources.list.d/test_signed_by.list"
+ )
+ assert path_contents == source
+
+ key = class_client.execute(
+ "gpg --no-default-keyring --with-fingerprint --list-keys "
+ "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg"
+ )
+
+ assert TEST_SIGNED_BY_KEY in key
+
+ def test_bad_key(self, class_client: IntegrationInstance):
+ """Test the apt signed-by functionality."""
+ with pytest.raises(OSError):
+ class_client.read_from_file(
+ "/etc/apt/trusted.list.d/test_bad_key.gpg"
+ )
+
+ def test_key(self, class_client: IntegrationInstance):
+ """Test the apt key functionality.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_sources_key.py
+ """
+ test_archive_contents = class_client.read_from_file(
+ "/etc/apt/sources.list.d/test_key.list"
+ )
+
+ assert (
+ "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu"
+ in test_archive_contents
+ )
+ assert TEST_KEY in self.get_keys(class_client)
+
+ def test_keyserver(self, class_client: IntegrationInstance):
+ """Test the apt keyserver functionality.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py
+ """
+ test_keyserver_contents = class_client.read_from_file(
+ "/etc/apt/sources.list.d/test_keyserver.list"
+ )
+
+ assert (
+ "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu"
+ in test_keyserver_contents
+ )
+
+ assert TEST_KEYSERVER_KEY in self.get_keys(class_client)
+
+ def test_os_pipelining(self, class_client: IntegrationInstance):
+ """Test 'os' settings does not write apt config file.
+
+ Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py
+ """
+ conf_exists = class_client.execute(
+ "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining"
+ ).ok
+ assert conf_exists is False
+
+
+_DEFAULT_DATA = """\
+#cloud-config
+apt:
+ primary:
+ - arches:
+ - default
+ {uri}
+ security:
+ - arches:
+ - default
+"""
+DEFAULT_DATA = _DEFAULT_DATA.format(uri="")
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(DEFAULT_DATA)
+class TestDefaults:
+ @pytest.mark.openstack
+ def test_primary_on_openstack(self, class_client: IntegrationInstance):
+ """Test apt default primary source on openstack.
+
+ When no uri is provided.
+ """
+ zone = class_client.execute("cloud-init query v1.availability_zone")
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+ assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list
+
+ def test_security(self, class_client: IntegrationInstance):
+ """Test apt default security sources.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_security.py
+ """
+ sources_list = class_client.read_from_file("/etc/apt/sources.list")
+
+ # 3 lines from main, universe, and multiverse
+ sec_url = "deb http://security.ubuntu.com/ubuntu"
+ if class_client.settings.PLATFORM == "azure":
+ sec_url = (
+ "deb http://azure.archive.ubuntu.com/ubuntu/ jammy-security"
+ )
+ sec_src_url = sec_url.replace("deb ", "# deb-src ")
+ assert 3 == sources_list.count(sec_url)
+ assert 3 == sources_list.count(sec_src_url)
+
+
+DEFAULT_DATA_WITH_URI = _DEFAULT_DATA.format(
+ uri='uri: "http://something.random.invalid/ubuntu"'
+)
+
+
+@pytest.mark.user_data(DEFAULT_DATA_WITH_URI)
+def test_default_primary_with_uri(client: IntegrationInstance):
+ """Test apt default primary sources.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_primary.py
+ """
+ sources_list = client.read_from_file("/etc/apt/sources.list")
+ assert "archive.ubuntu.com" not in sources_list
+
+ assert "something.random.invalid" in sources_list
+
+
+DISABLED_DATA = """\
+#cloud-config
+apt:
+ disable_suites:
+ - $RELEASE
+ - $RELEASE-updates
+ - $RELEASE-backports
+ - $RELEASE-security
+apt_pipelining: false
+"""
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(DISABLED_DATA)
+class TestDisabled:
+ def test_disable_suites(self, class_client: IntegrationInstance):
+ """Test disabling of apt suites.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_configure_disable_suites.py
+ """
+ sources_list = class_client.execute(
+ "cat /etc/apt/sources.list | grep -v '^#'"
+ ).strip()
+ assert "" == sources_list
+
+ def test_disable_apt_pipelining(self, class_client: IntegrationInstance):
+ """Test disabling of apt pipelining.
+
+ Ported from
+ tests/cloud_tests/testcases/modules/apt_pipelining_disable.py
+ """
+ conf = class_client.read_from_file(
+ "/etc/apt/apt.conf.d/90cloud-init-pipelining"
+ )
+ assert 'Acquire::http::Pipeline-Depth "0";' in conf
+
+
+APT_PROXY_DATA = """\
+#cloud-config
+apt:
+ proxy: "http://proxy.internal:3128"
+ http_proxy: "http://squid.internal:3128"
+ ftp_proxy: "ftp://squid.internal:3128"
+ https_proxy: "https://squid.internal:3128"
+"""
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(APT_PROXY_DATA)
+def test_apt_proxy(client: IntegrationInstance):
+ """Test the apt proxy data gets written correctly."""
+ out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy")
+ assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out
+ assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out
+ assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out
+ assert 'Acquire::https::Proxy "https://squid.internal:3128";' in out
diff --git a/tests/integration_tests/modules/test_apt_configure_sources_list.py b/tests/integration_tests/modules/test_apt_configure_sources_list.py
deleted file mode 100644
index d2bcc61a..00000000
--- a/tests/integration_tests/modules/test_apt_configure_sources_list.py
+++ /dev/null
@@ -1,51 +0,0 @@
-"""Integration test for the apt module's ``sources_list`` functionality.
-
-This test specifies a ``sources_list`` and then checks that (a) the expected
-number of sources.list entries is present, and (b) that each expected line
-appears in the file.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml``.)"""
-import re
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-apt:
- primary:
- - arches: [default]
- uri: http://archive.ubuntu.com/ubuntu
- security:
- - arches: [default]
- uri: http://security.ubuntu.com/ubuntu
- sources_list: |
- deb $MIRROR $RELEASE main restricted
- deb-src $MIRROR $RELEASE main restricted
- deb $PRIMARY $RELEASE universe restricted
- deb-src $PRIMARY $RELEASE universe restricted
- deb $SECURITY $RELEASE-security multiverse
- deb-src $SECURITY $RELEASE-security multiverse
-"""
-
-EXPECTED_REGEXES = [
- r"deb http://archive.ubuntu.com/ubuntu [a-z].* main restricted",
- r"deb-src http://archive.ubuntu.com/ubuntu [a-z].* main restricted",
- r"deb http://archive.ubuntu.com/ubuntu [a-z].* universe restricted",
- r"deb-src http://archive.ubuntu.com/ubuntu [a-z].* universe restricted",
- r"deb http://security.ubuntu.com/ubuntu [a-z].*security multiverse",
- r"deb-src http://security.ubuntu.com/ubuntu [a-z].*security multiverse",
-]
-
-
-@pytest.mark.ci
-class TestAptConfigureSourcesList:
-
- @pytest.mark.user_data(USER_DATA)
- def test_sources_list(self, client):
- sources_list = client.read_from_file("/etc/apt/sources.list")
- assert 6 == len(sources_list.rstrip().split('\n'))
-
- for expected_re in EXPECTED_REGEXES:
- assert re.search(expected_re, sources_list) is not None
diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py
new file mode 100644
index 00000000..7247fd7d
--- /dev/null
+++ b/tests/integration_tests/modules/test_ca_certs.py
@@ -0,0 +1,90 @@
+"""Integration tests for cc_ca_certs.
+
+(This is ported from ``tests/cloud_tests//testcases/modules/ca_certs.yaml``.)
+
+TODO:
+* Mark this as running on Debian and Alpine (once we have marks for that)
+* Implement testing for the RHEL-specific paths
+"""
+import os.path
+
+import pytest
+
+USER_DATA = """\
+#cloud-config
+ca_certs:
+ remove_defaults: true
+ trusted:
+ - |
+ -----BEGIN CERTIFICATE-----
+ MIIGJzCCBA+gAwIBAgIBATANBgkqhkiG9w0BAQUFADCBsjELMAkGA1UEBhMCRlIx
+ DzANBgNVBAgMBkFsc2FjZTETMBEGA1UEBwwKU3RyYXNib3VyZzEYMBYGA1UECgwP
+ d3d3LmZyZWVsYW4ub3JnMRAwDgYDVQQLDAdmcmVlbGFuMS0wKwYDVQQDDCRGcmVl
+ bGFuIFNhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkxIjAgBgkqhkiG9w0BCQEW
+ E2NvbnRhY3RAZnJlZWxhbi5vcmcwHhcNMTIwNDI3MTAzMTE4WhcNMjIwNDI1MTAz
+ MTE4WjB+MQswCQYDVQQGEwJGUjEPMA0GA1UECAwGQWxzYWNlMRgwFgYDVQQKDA93
+ d3cuZnJlZWxhbi5vcmcxEDAOBgNVBAsMB2ZyZWVsYW4xDjAMBgNVBAMMBWFsaWNl
+ MSIwIAYJKoZIhvcNAQkBFhNjb250YWN0QGZyZWVsYW4ub3JnMIICIjANBgkqhkiG
+ 9w0BAQEFAAOCAg8AMIICCgKCAgEA3W29+ID6194bH6ejLrIC4hb2Ugo8v6ZC+Mrc
+ k2dNYMNPjcOKABvxxEtBamnSaeU/IY7FC/giN622LEtV/3oDcrua0+yWuVafyxmZ
+ yTKUb4/GUgafRQPf/eiX9urWurtIK7XgNGFNUjYPq4dSJQPPhwCHE/LKAykWnZBX
+ RrX0Dq4XyApNku0IpjIjEXH+8ixE12wH8wt7DEvdO7T3N3CfUbaITl1qBX+Nm2Z6
+ q4Ag/u5rl8NJfXg71ZmXA3XOj7zFvpyapRIZcPmkvZYn7SMCp8dXyXHPdpSiIWL2
+ uB3KiO4JrUYvt2GzLBUThp+lNSZaZ/Q3yOaAAUkOx+1h08285Pi+P8lO+H2Xic4S
+ vMq1xtLg2bNoPC5KnbRfuFPuUD2/3dSiiragJ6uYDLOyWJDivKGt/72OVTEPAL9o
+ 6T2pGZrwbQuiFGrGTMZOvWMSpQtNl+tCCXlT4mWqJDRwuMGrI4DnnGzt3IKqNwS4
+ Qyo9KqjMIPwnXZAmWPm3FOKe4sFwc5fpawKO01JZewDsYTDxVj+cwXwFxbE2yBiF
+ z2FAHwfopwaH35p3C6lkcgP2k/zgAlnBluzACUI+MKJ/G0gv/uAhj1OHJQ3L6kn1
+ SpvQ41/ueBjlunExqQSYD7GtZ1Kg8uOcq2r+WISE3Qc9MpQFFkUVllmgWGwYDuN3
+ Zsez95kCAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0EHxYdT3BlblNT
+ TCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYEFFlfyRO6G8y5qEFKikl5
+ ajb2fT7XMB8GA1UdIwQYMBaAFCNsLT0+KV14uGw+quK7Lh5sh/JTMA0GCSqGSIb3
+ DQEBBQUAA4ICAQAT5wJFPqervbja5+90iKxi1d0QVtVGB+z6aoAMuWK+qgi0vgvr
+ mu9ot2lvTSCSnRhjeiP0SIdqFMORmBtOCFk/kYDp9M/91b+vS+S9eAlxrNCB5VOf
+ PqxEPp/wv1rBcE4GBO/c6HcFon3F+oBYCsUQbZDKSSZxhDm3mj7pb67FNbZbJIzJ
+ 70HDsRe2O04oiTx+h6g6pW3cOQMgIAvFgKN5Ex727K4230B0NIdGkzuj4KSML0NM
+ slSAcXZ41OoSKNjy44BVEZv0ZdxTDrRM4EwJtNyggFzmtTuV02nkUj1bYYYC5f0L
+ ADr6s0XMyaNk8twlWYlYDZ5uKDpVRVBfiGcq0uJIzIvemhuTrofh8pBQQNkPRDFT
+ Rq1iTo1Ihhl3/Fl1kXk1WR3jTjNb4jHX7lIoXwpwp767HAPKGhjQ9cFbnHMEtkro
+ RlJYdtRq5mccDtwT0GFyoJLLBZdHHMHJz0F9H7FNk2tTQQMhK5MVYwg+LIaee586
+ CQVqfbscp7evlgjLW98H+5zylRHAgoH2G79aHljNKMp9BOuq6SnEglEsiWGVtu2l
+ hnx8SB3sVJZHeer8f/UQQwqbAO+Kdy70NmbSaqaVtp8jOxLiidWkwSyRTsuU6D8i
+ DiH5uEqBXExjrj0FslxcVKdVj5glVcSmkLwZKbEU1OKwleT/iXFhvooWhQ==
+ -----END CERTIFICATE-----
+"""
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(USER_DATA)
+class TestCaCerts:
+ def test_certs_updated(self, class_client):
+ """Test that /etc/ssl/certs is updated as we expect."""
+ root = "/etc/ssl/certs"
+ filenames = class_client.execute(["ls", "-1", root]).splitlines()
+ unlinked_files = []
+ links = {}
+ for filename in filenames:
+ full_path = os.path.join(root, filename)
+ symlink_target = class_client.execute(["readlink", full_path])
+ is_symlink = symlink_target.ok
+ if is_symlink:
+ links[filename] = symlink_target
+ else:
+ unlinked_files.append(filename)
+
+ assert ["ca-certificates.crt"] == unlinked_files
+ assert "cloud-init-ca-certs.pem" == links["a535c1f3.0"]
+ assert (
+ "/usr/share/ca-certificates/cloud-init-ca-certs.crt"
+ == links["cloud-init-ca-certs.pem"]
+ )
+
+ def test_cert_installed(self, class_client):
+ """Test that our specified cert has been installed"""
+ checksum = class_client.execute(
+ "sha256sum /etc/ssl/certs/ca-certificates.crt"
+ )
+ assert (
+ "78e875f18c73c1aab9167ae0bd323391e52222cc2dbcda42d129537219300062"
+ in checksum
+ )
diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py
new file mode 100644
index 00000000..baaa7567
--- /dev/null
+++ b/tests/integration_tests/modules/test_cli.py
@@ -0,0 +1,81 @@
+"""Integration tests for CLI functionality
+
+These would be for behavior manually invoked by user from the command line
+"""
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+
+VALID_USER_DATA = """\
+#cloud-config
+runcmd:
+ - echo 'hi' > /var/tmp/test
+"""
+
+INVALID_USER_DATA_HEADER = """\
+runcmd:
+ - echo 'hi' > /var/tmp/test
+"""
+
+INVALID_USER_DATA_SCHEMA = """\
+#cloud-config
+updates:
+ notnetwork: -1
+apt_pipelining: bogus
+"""
+
+
+@pytest.mark.user_data(VALID_USER_DATA)
+def test_valid_userdata(client: IntegrationInstance):
+ """Test `cloud-init devel schema` with valid userdata.
+
+ PR #575
+ """
+ result = client.execute("cloud-init devel schema --system")
+ assert result.ok
+ assert "Valid cloud-config: system userdata" == result.stdout.strip()
+ result = client.execute("cloud-init status --long")
+ if not result.ok:
+ raise AssertionError(
+ f"Unexpected error from cloud-init status: {result}"
+ )
+
+
+@pytest.mark.user_data(INVALID_USER_DATA_HEADER)
+def test_invalid_userdata(client: IntegrationInstance):
+ """Test `cloud-init devel schema` with invalid userdata.
+
+ PR #575
+ """
+ result = client.execute("cloud-init devel schema --system")
+ assert not result.ok
+ assert "Cloud config schema errors" in result.stderr
+ assert 'needs to begin with "#cloud-config"' in result.stderr
+ result = client.execute("cloud-init status --long")
+ if not result.ok:
+ raise AssertionError(
+ f"Unexpected error from cloud-init status: {result}"
+ )
+
+
+@pytest.mark.user_data(INVALID_USER_DATA_SCHEMA)
+def test_invalid_userdata_schema(client: IntegrationInstance):
+ """Test invalid schema represented as Warnings, not fatal
+
+ PR #1175
+ """
+ result = client.execute("cloud-init status --long")
+ assert result.ok
+ log = client.read_from_file("/var/log/cloud-init.log")
+ warning = (
+ "[WARNING]: Invalid cloud-config provided:\napt_pipelining: 'bogus'"
+ " is not valid under any of the given schemas\nupdates: Additional"
+ " properties are not allowed ('notnetwork' was unexpected)"
+ )
+ assert warning in log
+ result = client.execute("cloud-init status --long")
+ if not result.ok:
+ raise AssertionError(
+ f"Unexpected error from cloud-init status: {result}"
+ )
diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py
new file mode 100644
index 00000000..7a9a6e27
--- /dev/null
+++ b/tests/integration_tests/modules/test_combined.py
@@ -0,0 +1,342 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+"""A set of somewhat unrelated tests that can be combined into a single
+instance launch. Generally tests should only be added here if a failure
+of the test would be unlikely to affect the running of another test using
+the same instance launch. Most independent module coherence tests can go
+here.
+"""
+import json
+import re
+
+import pytest
+
+from tests.integration_tests.clouds import ImageSpecification
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import (
+ retry,
+ verify_clean_log,
+ verify_ordered_items_in_text,
+)
+
+USER_DATA = """\
+#cloud-config
+apt:
+ primary:
+ - arches: [default]
+ uri: http://us.archive.ubuntu.com/ubuntu/
+byobu_by_default: enable
+final_message: |
+ This is my final message!
+ $version
+ $timestamp
+ $datasource
+ $uptime
+locale: en_GB.UTF-8
+locale_configfile: /etc/default/locale
+ntp:
+ servers: ['ntp.ubuntu.com']
+package_update: true
+random_seed:
+ data: 'MYUb34023nD:LFDK10913jk;dfnk:Df'
+ encoding: raw
+ file: /root/seed
+rsyslog:
+ configs:
+ - "*.* @@127.0.0.1"
+ - filename: 0-basic-config.conf
+ content: |
+ module(load="imtcp")
+ input(type="imtcp" port="514")
+ $template RemoteLogs,"/var/tmp/rsyslog.log"
+ *.* ?RemoteLogs
+ & ~
+ remotes:
+ me: "127.0.0.1"
+runcmd:
+ - echo 'hello world' > /var/tmp/runcmd_output
+
+ - #
+ - logger "My test log"
+snap:
+ squashfuse_in_container: true
+ commands:
+ - snap install hello-world
+ssh_import_id:
+ - gh:powersj
+ - lp:smoser
+timezone: US/Aleutian
+"""
+
+
+@pytest.mark.ci
+@pytest.mark.user_data(USER_DATA)
+class TestCombined:
+ def test_final_message(self, class_client: IntegrationInstance):
+ """Test that final_message module works as expected.
+
+ Also tests LP 1511485: final_message is silent.
+ """
+ client = class_client
+ log = client.read_from_file("/var/log/cloud-init.log")
+ expected = (
+ "This is my final message!\n"
+ r"\d+\.\d+.*\n"
+ r"\w{3}, \d{2} \w{3} \d{4} \d{2}:\d{2}:\d{2} \+\d{4}\n" # Datetime
+ "DataSource.*\n"
+ r"\d+\.\d+"
+ )
+
+ assert re.search(expected, log)
+
+ def test_ntp_with_apt(self, class_client: IntegrationInstance):
+ """LP #1628337.
+
+ cloud-init tries to install NTP before even
+ configuring the archives.
+ """
+ client = class_client
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "W: Failed to fetch" not in log
+ assert "W: Some index files failed to download" not in log
+ assert "E: Unable to locate package ntp" not in log
+
+ def test_byobu(self, class_client: IntegrationInstance):
+ """Test byobu configured as enabled by default."""
+ client = class_client
+ assert client.execute('test -e "/etc/byobu/autolaunch"').ok
+
+ def test_configured_locale(self, class_client: IntegrationInstance):
+ """Test locale can be configured correctly."""
+ client = class_client
+ default_locale = client.read_from_file("/etc/default/locale")
+ assert "LANG=en_GB.UTF-8" in default_locale
+
+ locale_a = client.execute("locale -a")
+ verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a)
+
+ locale_gen = client.execute(
+ "cat /etc/locale.gen | grep -v '^#' | uniq"
+ )
+ verify_ordered_items_in_text(
+ ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen
+ )
+
+ def test_random_seed_data(self, class_client: IntegrationInstance):
+ """Integration test for the random seed module.
+
+ This test specifies a command to be executed by the ``seed_random``
+ module, by providing a different data to be used as seed data. We will
+ then check if that seed data was actually used.
+ """
+ client = class_client
+
+ # Only read the first 31 characters, because the rest could be
+ # binary data
+ result = client.execute("head -c 31 < /root/seed")
+ assert result.startswith("MYUb34023nD:LFDK10913jk;dfnk:Df")
+
+ def test_rsyslog(self, class_client: IntegrationInstance):
+ """Test rsyslog is configured correctly."""
+ client = class_client
+ assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log")
+
+ def test_runcmd(self, class_client: IntegrationInstance):
+ """Test runcmd works as expected"""
+ client = class_client
+ assert "hello world" == client.read_from_file("/var/tmp/runcmd_output")
+
+ @retry(tries=30, delay=1)
+ def test_ssh_import_id(self, class_client: IntegrationInstance):
+ """Integration test for the ssh_import_id module.
+
+ This test specifies ssh keys to be imported by the ``ssh_import_id``
+ module and then checks that if the ssh keys were successfully imported.
+
+ TODO:
+ * This test assumes that SSH keys will be imported into the
+ /home/ubuntu; this will need modification to run on other OSes.
+ """
+ client = class_client
+ ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys")
+
+ assert "# ssh-import-id gh:powersj" in ssh_output
+ assert "# ssh-import-id lp:smoser" in ssh_output
+
+ def test_snap(self, class_client: IntegrationInstance):
+ """Integration test for the snap module.
+
+ This test specifies a command to be executed by the ``snap`` module
+ and then checks that if that command was executed during boot.
+ """
+ client = class_client
+ snap_output = client.execute("snap list")
+ assert "core " in snap_output
+ assert "hello-world " in snap_output
+
+ def test_timezone(self, class_client: IntegrationInstance):
+ """Integration test for the timezone module.
+
+ This test specifies a timezone to be used by the ``timezone`` module
+ and then checks that if that timezone was respected during boot.
+ """
+ client = class_client
+ timezone_output = client.execute(
+ 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"'
+ )
+ assert timezone_output.strip() == "HDT"
+
+ def test_no_problems(self, class_client: IntegrationInstance):
+ """Test no errors, warnings, or tracebacks"""
+ client = class_client
+ status_file = client.read_from_file("/run/cloud-init/status.json")
+ status_json = json.loads(status_file)["v1"]
+ for stage in ("init", "init-local", "modules-config", "modules-final"):
+ assert status_json[stage]["errors"] == []
+ result_file = client.read_from_file("/run/cloud-init/result.json")
+ result_json = json.loads(result_file)["v1"]
+ assert result_json["errors"] == []
+
+ log = client.read_from_file("/var/log/cloud-init.log")
+ verify_clean_log(log)
+
+ def test_correct_datasource_detected(
+ self, class_client: IntegrationInstance
+ ):
+ """Test datasource is detected at the proper boot stage."""
+ client = class_client
+ status_file = client.read_from_file("/run/cloud-init/status.json")
+ parsed_datasource = json.loads(status_file)["v1"]["datasource"]
+
+ if client.settings.PLATFORM in ["lxd_container", "lxd_vm"]:
+ assert parsed_datasource.startswith("DataSourceNoCloud")
+ else:
+ platform_datasources = {
+ "azure": "DataSourceAzure [seed=/dev/sr0]",
+ "ec2": "DataSourceEc2Local",
+ "gce": "DataSourceGCELocal",
+ "oci": "DataSourceOracle",
+ "openstack": "DataSourceOpenStackLocal [net,ver=2]",
+ }
+ assert (
+ platform_datasources[client.settings.PLATFORM]
+ == parsed_datasource
+ )
+
+ def test_cloud_id_file_symlink(self, class_client: IntegrationInstance):
+ cloud_id = class_client.execute("cloud-id").stdout
+ expected_link_output = (
+ "'/run/cloud-init/cloud-id' -> "
+ f"'/run/cloud-init/cloud-id-{cloud_id}'"
+ )
+ assert expected_link_output == str(
+ class_client.execute("stat -c %N /run/cloud-init/cloud-id")
+ )
+
+ def _check_common_metadata(self, data):
+ assert data["base64_encoded_keys"] == []
+ assert data["merged_cfg"] == "redacted for non-root user"
+
+ image_spec = ImageSpecification.from_os_image()
+ assert data["sys_info"]["dist"][0] == image_spec.os
+
+ v1_data = data["v1"]
+ assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"])
+ assert v1_data["variant"] == image_spec.os
+ assert v1_data["distro"] == image_spec.os
+ assert v1_data["distro_release"] == image_spec.release
+ assert v1_data["machine"] == "x86_64"
+ assert re.match(r"3.\d\.\d", v1_data["python_version"])
+
+ @pytest.mark.lxd_container
+ def test_instance_json_lxd(self, class_client: IntegrationInstance):
+ client = class_client
+ instance_json_file = client.read_from_file(
+ "/run/cloud-init/instance-data.json"
+ )
+
+ data = json.loads(instance_json_file)
+ self._check_common_metadata(data)
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert v1_data["cloud_id"] == "lxd"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-lxd"
+ )
+ assert (
+ v1_data["subplatform"]
+ == "seed-dir (/var/lib/cloud/seed/nocloud-net)"
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
+
+ @pytest.mark.lxd_vm
+ def test_instance_json_lxd_vm(self, class_client: IntegrationInstance):
+ client = class_client
+ instance_json_file = client.read_from_file(
+ "/run/cloud-init/instance-data.json"
+ )
+
+ data = json.loads(instance_json_file)
+ self._check_common_metadata(data)
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "unknown"
+ assert v1_data["platform"] == "lxd"
+ assert v1_data["cloud_id"] == "lxd"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-lxd"
+ )
+ assert any(
+ [
+ "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"],
+ "/dev/sr0" in v1_data["subplatform"],
+ ]
+ )
+ assert v1_data["availability_zone"] is None
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"] == client.instance.name
+ assert v1_data["region"] is None
+
+ @pytest.mark.ec2
+ def test_instance_json_ec2(self, class_client: IntegrationInstance):
+ client = class_client
+ instance_json_file = client.read_from_file(
+ "/run/cloud-init/instance-data.json"
+ )
+ data = json.loads(instance_json_file)
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "aws"
+ assert v1_data["platform"] == "ec2"
+ # Different regions will show up as ec2-(gov|china)
+ assert v1_data["cloud_id"].startswith("ec2")
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-ec2"
+ )
+ assert v1_data["subplatform"].startswith("metadata")
+ assert (
+ v1_data["availability_zone"] == client.instance.availability_zone
+ )
+ assert v1_data["instance_id"] == client.instance.name
+ assert v1_data["local_hostname"].startswith("ip-")
+ assert v1_data["region"] == client.cloud.cloud_instance.region
+
+ @pytest.mark.gce
+ def test_instance_json_gce(self, class_client: IntegrationInstance):
+ client = class_client
+ instance_json_file = client.read_from_file(
+ "/run/cloud-init/instance-data.json"
+ )
+ data = json.loads(instance_json_file)
+ self._check_common_metadata(data)
+ v1_data = data["v1"]
+ assert v1_data["cloud_name"] == "gce"
+ assert v1_data["platform"] == "gce"
+ assert f"{v1_data['cloud_id']}" == client.read_from_file(
+ "/run/cloud-init/cloud-id-gce"
+ )
+ assert v1_data["subplatform"].startswith("metadata")
+ assert v1_data["availability_zone"] == client.instance.zone
+ assert v1_data["instance_id"] == client.instance.instance_id
+ assert v1_data["local_hostname"] == client.instance.name
diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py
new file mode 100644
index 00000000..96525cac
--- /dev/null
+++ b/tests/integration_tests/modules/test_command_output.py
@@ -0,0 +1,21 @@
+"""Integration test for output redirection.
+
+This test redirects the output of a command to a file and then checks the file.
+
+(This is ported from
+``tests/cloud_tests/testcases/main/command_output_simple.yaml``.)"""
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+
+USER_DATA = """\
+#cloud-config
+output: { all: "| tee -a /var/log/cloud-init-test-output" }
+final_message: "should be last line in cloud-init-test-output file"
+"""
+
+
+@pytest.mark.user_data(USER_DATA)
+def test_runcmd(client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init-test-output")
+ assert "should be last line in cloud-init-test-output file" in log
diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py
new file mode 100644
index 00000000..7aaba7db
--- /dev/null
+++ b/tests/integration_tests/modules/test_disk_setup.py
@@ -0,0 +1,212 @@
+import json
+import os
+from uuid import uuid4
+
+import pytest
+from pycloudlib.lxd.instance import LXDInstance
+
+from cloudinit.subp import subp
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import verify_clean_log
+
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
+
+
+def setup_and_mount_lxd_disk(instance: LXDInstance):
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
+
+
+@pytest.fixture
+def create_disk():
+ # 640k should be enough for anybody
+ subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split())
+ yield
+ os.remove(DISK_PATH)
+
+
+ALIAS_USERDATA = """\
+#cloud-config
+device_aliases:
+ my_alias: /dev/sdb
+disk_setup:
+ my_alias:
+ table_type: mbr
+ layout: [50, 50]
+ overwrite: True
+fs_setup:
+- label: fs1
+ device: my_alias.1
+ filesystem: ext4
+- label: fs2
+ device: my_alias.2
+ filesystem: ext4
+mounts:
+- ["my_alias.1", "/mnt1"]
+- ["my_alias.2", "/mnt2"]
+"""
+
+
+@pytest.mark.user_data(ALIAS_USERDATA)
+@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk)
+@pytest.mark.ubuntu
+@pytest.mark.lxd_vm
+class TestDeviceAliases:
+ """Test devices aliases work on disk setup/mount"""
+
+ def test_device_alias(self, create_disk, client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert (
+ "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log
+ )
+ assert "changed my_alias.1 => /dev/sdb1" in log
+ assert "changed my_alias.2 => /dev/sdb2" in log
+ verify_clean_log(log)
+
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ if "mountpoint" in sdb["children"][0]:
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
+ else:
+ assert sdb["children"][0]["mountpoints"] == ["/mnt1"]
+ assert sdb["children"][1]["mountpoints"] == ["/mnt2"]
+ result = client.execute("mount -a")
+ assert result.return_code == 0
+ assert result.stdout.strip() == ""
+ assert result.stderr.strip() == ""
+ result = client.execute("findmnt -J /mnt1")
+ assert result.return_code == 0
+ result = client.execute("findmnt -J /mnt2")
+ assert result.return_code == 0
+
+
+PARTPROBE_USERDATA = """\
+#cloud-config
+disk_setup:
+ /dev/sdb:
+ table_type: mbr
+ layout: [50, 50]
+ overwrite: True
+fs_setup:
+ - label: test
+ device: /dev/sdb1
+ filesystem: ext4
+ - label: test2
+ device: /dev/sdb2
+ filesystem: ext4
+mounts:
+- ["/dev/sdb1", "/mnt1"]
+- ["/dev/sdb2", "/mnt2"]
+"""
+
+UPDATED_PARTPROBE_USERDATA = """\
+#cloud-config
+disk_setup:
+ /dev/sdb:
+ table_type: mbr
+ layout: [100]
+ overwrite: True
+fs_setup:
+ - label: test3
+ device: /dev/sdb1
+ filesystem: ext4
+mounts:
+- ["/dev/sdb1", "/mnt3"]
+"""
+
+
+@pytest.mark.user_data(PARTPROBE_USERDATA)
+@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk)
+@pytest.mark.ubuntu
+@pytest.mark.lxd_vm
+class TestPartProbeAvailability:
+ """Test disk setup works with partprobe
+
+ Disk setup can run successfully on a mounted partition when
+ partprobe is being used.
+
+ lp-1920939
+ """
+
+ def _verify_first_disk_setup(self, client, log):
+ verify_clean_log(log)
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 2
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["children"][1]["name"] == "sdb2"
+ if "mountpoint" in sdb["children"][0]:
+ assert sdb["children"][0]["mountpoint"] == "/mnt1"
+ assert sdb["children"][1]["mountpoint"] == "/mnt2"
+ else:
+ assert sdb["children"][0]["mountpoints"] == ["/mnt1"]
+ assert sdb["children"][1]["mountpoints"] == ["/mnt2"]
+
+ # Not bionic because the LXD agent gets in the way of us
+ # changing the userdata
+ @pytest.mark.not_bionic
+ def test_disk_setup_when_mounted(
+ self, create_disk, client: IntegrationInstance
+ ):
+ """Test lp-1920939.
+
+ We insert an extra disk into our VM, format it to have two partitions,
+ modify our cloud config to mount devices before disk setup, and modify
+ our userdata to setup a single partition on the disk.
+
+ This allows cloud-init to attempt disk setup on a mounted partition.
+ When blockdev is in use, it will fail with
+ "blockdev: ioctl error on BLKRRPART: Device or resource busy" along
+ with a warning and a traceback. When partprobe is in use, everything
+ should work successfully.
+ """
+ log = client.read_from_file("/var/log/cloud-init.log")
+ self._verify_first_disk_setup(client, log)
+
+ # Update our userdata and cloud.cfg to mount then perform new disk
+ # setup
+ client.write_to_file(
+ "/var/lib/cloud/seed/nocloud-net/user-data",
+ UPDATED_PARTPROBE_USERDATA,
+ )
+ client.execute(
+ "sed -i 's/write-files/write-files\\n - mounts/' "
+ "/etc/cloud/cloud.cfg"
+ )
+
+ client.execute("cloud-init clean --logs")
+ client.restart()
+
+ # Assert new setup works as expected
+ verify_clean_log(log)
+
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ if "mountpoint" in sdb["children"][0]:
+ assert sdb["children"][0]["mountpoint"] == "/mnt3"
+ else:
+ assert sdb["children"][0]["mountpoints"] == ["/mnt3"]
+
+ def test_disk_setup_no_partprobe(
+ self, create_disk, client: IntegrationInstance
+ ):
+ """Ensure disk setup still works as expected without partprobe."""
+ # We can't do this part in a bootcmd because the path has already
+ # been found by the time we get to the bootcmd
+ client.execute("rm $(which partprobe)")
+ client.execute("cloud-init clean --logs")
+ client.restart()
+
+ log = client.read_from_file("/var/log/cloud-init.log")
+ self._verify_first_disk_setup(client, log)
+
+ assert "partprobe" not in log
diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py
new file mode 100644
index 00000000..67251817
--- /dev/null
+++ b/tests/integration_tests/modules/test_growpart.py
@@ -0,0 +1,68 @@
+import json
+import os
+import pathlib
+from uuid import uuid4
+
+import pytest
+from pycloudlib.lxd.instance import LXDInstance
+
+from cloudinit.subp import subp
+from tests.integration_tests.instances import IntegrationInstance
+
+DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4())
+
+
+def setup_and_mount_lxd_disk(instance: LXDInstance):
+ subp(
+ "lxc config device add {} test-disk-setup-disk disk source={}".format(
+ instance.name, DISK_PATH
+ ).split()
+ )
+
+
+@pytest.fixture(scope="class", autouse=True)
+def create_disk():
+ """Create 16M sparse file"""
+ pathlib.Path(DISK_PATH).touch()
+ os.truncate(DISK_PATH, 1 << 24)
+ yield
+ os.remove(DISK_PATH)
+
+
+# Create undersized partition in bootcmd
+ALIAS_USERDATA = """\
+#cloud-config
+bootcmd:
+ - parted /dev/sdb --script \
+ mklabel gpt \
+ mkpart primary 0 1MiB
+ - parted /dev/sdb --script print
+growpart:
+ devices:
+ - "/"
+ - "/dev/sdb1"
+runcmd:
+ - parted /dev/sdb --script print
+"""
+
+
+@pytest.mark.user_data(ALIAS_USERDATA)
+@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk)
+@pytest.mark.ubuntu
+@pytest.mark.lxd_vm
+class TestGrowPart:
+ """Test growpart"""
+
+ def test_grow_part(self, client: IntegrationInstance):
+ """Verify"""
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert (
+ "cc_growpart.py[INFO]: '/dev/sdb1' resized:"
+ " changed (/dev/sdb, 1) from" in log
+ )
+
+ lsblk = json.loads(client.execute("lsblk --json"))
+ sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0]
+ assert len(sdb["children"]) == 1
+ assert sdb["children"][0]["name"] == "sdb1"
+ assert sdb["size"] == "16M"
diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py
new file mode 100644
index 00000000..0bad761e
--- /dev/null
+++ b/tests/integration_tests/modules/test_hotplug.py
@@ -0,0 +1,112 @@
+import time
+from collections import namedtuple
+
+import pytest
+import yaml
+
+from tests.integration_tests.instances import IntegrationInstance
+
+USER_DATA = """\
+#cloud-config
+updates:
+ network:
+ when: ['hotplug']
+"""
+
+ip_addr = namedtuple("ip_addr", "interface state ip4 ip6")
+
+
+def _wait_till_hotplug_complete(client, expected_runs=1):
+ for _ in range(60):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if log.count("Exiting hotplug handler") == expected_runs:
+ return log
+ time.sleep(1)
+ raise Exception("Waiting for hotplug handler failed")
+
+
+def _get_ip_addr(client):
+ ips = []
+ lines = client.execute("ip --brief addr").split("\n")
+ for line in lines:
+ attributes = line.split()
+ interface, state = attributes[0], attributes[1]
+ ip4_cidr = attributes[2] if len(attributes) > 2 else None
+ ip6_cidr = attributes[3] if len(attributes) > 3 else None
+ ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None
+ ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None
+ ip = ip_addr(interface, state, ip4, ip6)
+ ips.append(ip)
+ return ips
+
+
+@pytest.mark.openstack
+# On Bionic, we traceback when attempting to detect the hotplugged
+# device in the updated metadata. This is because Bionic is specifically
+# configured not to provide network metadata.
+@pytest.mark.not_bionic
+@pytest.mark.user_data(USER_DATA)
+def test_hotplug_add_remove(client: IntegrationInstance):
+ ips_before = _get_ip_addr(client)
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
+ assert client.execute(
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
+ ).ok
+
+ # Add new NIC
+ added_ip = client.instance.add_network_interface()
+ _wait_till_hotplug_complete(client, expected_runs=1)
+ ips_after_add = _get_ip_addr(client)
+ new_addition = [ip for ip in ips_after_add if ip.ip4 == added_ip][0]
+
+ assert len(ips_after_add) == len(ips_before) + 1
+ assert added_ip not in [ip.ip4 for ip in ips_before]
+ assert added_ip in [ip.ip4 for ip in ips_after_add]
+ assert new_addition.state == "UP"
+
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
+ config = yaml.safe_load(netplan_cfg)
+ assert new_addition.interface in config["network"]["ethernets"]
+
+ # Remove new NIC
+ client.instance.remove_network_interface(added_ip)
+ _wait_till_hotplug_complete(client, expected_runs=2)
+ ips_after_remove = _get_ip_addr(client)
+ assert len(ips_after_remove) == len(ips_before)
+ assert added_ip not in [ip.ip4 for ip in ips_after_remove]
+
+ netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml")
+ config = yaml.safe_load(netplan_cfg)
+ assert new_addition.interface not in config["network"]["ethernets"]
+
+ assert "enabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
+ )
+
+
+@pytest.mark.openstack
+def test_no_hotplug_in_userdata(client: IntegrationInstance):
+ ips_before = _get_ip_addr(client)
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Exiting hotplug handler" not in log
+ assert client.execute(
+ "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules"
+ ).failed
+
+ # Add new NIC
+ client.instance.add_network_interface()
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "hotplug-hook" not in log
+
+ ips_after_add = _get_ip_addr(client)
+ if len(ips_after_add) == len(ips_before) + 1:
+ # We can see the device, but it should not have been brought up
+ new_ip = [ip for ip in ips_after_add if ip not in ips_before][0]
+ assert new_ip.state == "DOWN"
+ else:
+ assert len(ips_after_add) == len(ips_before)
+
+ assert "disabled" == client.execute(
+ "cloud-init devel hotplug-hook -s net query"
+ )
diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py
new file mode 100644
index 00000000..7788c6f0
--- /dev/null
+++ b/tests/integration_tests/modules/test_jinja_templating.py
@@ -0,0 +1,33 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import verify_ordered_items_in_text
+
+USER_DATA = """\
+## template: jinja
+#cloud-config
+runcmd:
+ - echo {{v1.local_hostname}} > /var/tmp/runcmd_output
+ - echo {{merged_cfg._doc}} >> /var/tmp/runcmd_output
+ - echo {{v1['local-hostname']}} >> /var/tmp/runcmd_output
+"""
+
+
+@pytest.mark.user_data(USER_DATA)
+def test_runcmd_with_variable_substitution(client: IntegrationInstance):
+ """Test jinja substitution.
+
+ Ensure underscore-delimited aliases exist for hyphenated key and
+ we can also substitute variables from instance-data-sensitive
+ LP: #1931392.
+ """
+ hostname = client.execute("hostname").stdout.strip()
+ expected = [
+ hostname,
+ "Merged cloud-init system config from /etc/cloud/cloud.cfg and "
+ "/etc/cloud/cloud.cfg.d/",
+ hostname,
+ ]
+ output = client.read_from_file("/var/tmp/runcmd_output")
+ verify_ordered_items_in_text(expected, output)
diff --git a/tests/integration_tests/modules/test_keyboard.py b/tests/integration_tests/modules/test_keyboard.py
new file mode 100644
index 00000000..7db35014
--- /dev/null
+++ b/tests/integration_tests/modules/test_keyboard.py
@@ -0,0 +1,17 @@
+import pytest
+
+USER_DATA = """\
+#cloud-config
+keyboard:
+ layout: de
+ model: pc105
+ variant: nodeadkeys
+ options: compose:rwin
+"""
+
+
+class TestKeyboard:
+ @pytest.mark.user_data(USER_DATA)
+ def test_keyboard(self, client):
+ lc = client.execute("localectl")
+ assert "X11 Layout: de" in lc
diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py
new file mode 100644
index 00000000..50899982
--- /dev/null
+++ b/tests/integration_tests/modules/test_keys_to_console.py
@@ -0,0 +1,113 @@
+"""Integration tests for the cc_keys_to_console module.
+
+(This is ported from
+``tests/cloud_tests/testcases/modules/keys_to_console.yaml``.)"""
+import pytest
+
+from tests.integration_tests.util import retry
+
+BLACKLIST_USER_DATA = """\
+#cloud-config
+ssh_fp_console_blacklist: [ssh-dss, ssh-dsa, ecdsa-sha2-nistp256]
+ssh_key_console_blacklist: [ssh-dss, ssh-dsa, ecdsa-sha2-nistp256]
+"""
+
+BLACKLIST_ALL_KEYS_USER_DATA = """\
+#cloud-config
+ssh_fp_console_blacklist: [ssh-dsa, ssh-ecdsa, ssh-ed25519, ssh-rsa, ssh-dss, ecdsa-sha2-nistp256]
+""" # noqa: E501
+
+DISABLED_USER_DATA = """\
+#cloud-config
+ssh:
+ emit_keys_to_console: false
+"""
+
+ENABLE_KEYS_TO_CONSOLE_USER_DATA = """\
+#cloud-config
+ssh:
+ emit_keys_to_console: true
+users:
+ - default
+ - name: barfoo
+"""
+
+
+@pytest.mark.user_data(BLACKLIST_USER_DATA)
+class TestKeysToConsoleBlacklist:
+ """Test that the blacklist options work as expected."""
+
+ @pytest.mark.parametrize("key_type", ["DSA", "ECDSA"])
+ def test_excluded_keys(self, class_client, key_type):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "({})".format(key_type) not in syslog
+
+ # retry decorator here because it can take some time to be reflected
+ # in syslog
+ @retry(tries=30, delay=1)
+ @pytest.mark.parametrize("key_type", ["ED25519", "RSA"])
+ def test_included_keys(self, class_client, key_type):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "({})".format(key_type) in syslog
+
+
+@pytest.mark.user_data(BLACKLIST_ALL_KEYS_USER_DATA)
+class TestAllKeysToConsoleBlacklist:
+ """Test that when key blacklist contains all key types that
+ no header/footer are output.
+ """
+
+ def test_header_excluded(self, class_client):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog
+
+ def test_footer_excluded(self, class_client):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "END SSH HOST KEY FINGERPRINTS" not in syslog
+
+
+@pytest.mark.user_data(DISABLED_USER_DATA)
+class TestKeysToConsoleDisabled:
+ """Test that output can be fully disabled."""
+
+ @pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"])
+ def test_keys_excluded(self, class_client, key_type):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "({})".format(key_type) not in syslog
+
+ def test_header_excluded(self, class_client):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog
+
+ def test_footer_excluded(self, class_client):
+ syslog = class_client.read_from_file("/var/log/syslog")
+ assert "END SSH HOST KEY FINGERPRINTS" not in syslog
+
+
+@pytest.mark.user_data(ENABLE_KEYS_TO_CONSOLE_USER_DATA)
+@pytest.mark.ec2
+@pytest.mark.lxd_container
+@pytest.mark.oci
+@pytest.mark.openstack
+class TestKeysToConsoleEnabled:
+ """Test that output can be enabled disabled."""
+
+ def test_duplicate_messaging_console_log(self, class_client):
+ class_client.execute("cloud-init status --wait --long").ok
+ try:
+ console_log = class_client.instance.console_log()
+ except NotImplementedError:
+ # Assume that an exception here means that we can't use the console
+ # log
+ pytest.skip("NotImplementedError when requesting console log")
+ return
+ if console_log.lower() == "no console output":
+ # This test retries because we might not have the full console log
+ # on the first fetch. However, if we have no console output
+ # at all, we don't want to keep retrying as that would trigger
+ # another 5 minute wait on the pycloudlib side, which could
+ # leave us waiting for a couple hours
+ pytest.fail("no console output")
+ return
+ msg = "no authorized SSH keys fingerprints found for user barfoo."
+ assert 1 == console_log.count(msg)
diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py
new file mode 100644
index 00000000..3292a833
--- /dev/null
+++ b/tests/integration_tests/modules/test_lxd_bridge.py
@@ -0,0 +1,46 @@
+"""Integration tests for LXD bridge creation.
+
+(This is ported from
+``tests/cloud_tests/testcases/modules/lxd_bridge.yaml``.)
+"""
+import pytest
+import yaml
+
+from tests.integration_tests.util import verify_clean_log
+
+USER_DATA = """\
+#cloud-config
+lxd:
+ init:
+ storage_backend: dir
+ bridge:
+ mode: new
+ name: lxdbr0
+ ipv4_address: 10.100.100.1
+ ipv4_netmask: 24
+ ipv4_dhcp_first: 10.100.100.100
+ ipv4_dhcp_last: 10.100.100.200
+ ipv4_nat: true
+ domain: lxd
+"""
+
+
+@pytest.mark.no_container
+@pytest.mark.user_data(USER_DATA)
+class TestLxdBridge:
+ @pytest.mark.parametrize("binary_name", ["lxc", "lxd"])
+ def test_binaries_installed(self, class_client, binary_name):
+ """Check that the expected LXD binaries are installed"""
+ assert class_client.execute(["which", binary_name]).ok
+
+ def test_bridge(self, class_client):
+ """Check that the given bridge is configured"""
+ cloud_init_log = class_client.read_from_file("/var/log/cloud-init.log")
+ verify_clean_log(cloud_init_log)
+
+ # The bridge should exist
+ assert class_client.execute("ip addr show lxdbr0")
+
+ raw_network_config = class_client.execute("lxc network show lxdbr0")
+ network_config = yaml.safe_load(raw_network_config)
+ assert "10.100.100.1/24" == network_config["config"]["ipv4.address"]
diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py
index e72389c1..fc62e63b 100644
--- a/tests/integration_tests/modules/test_ntp_servers.py
+++ b/tests/integration_tests/modules/test_ntp_servers.py
@@ -1,14 +1,18 @@
-"""Integration test for the ntp module's ``servers`` functionality with ntp.
+"""Integration test for the ntp module's ntp functionality.
This test specifies the use of the `ntp` NTP client, and ensures that the given
NTP servers are configured as expected.
-(This is ported from ``tests/cloud_tests/testcases/modules/ntp_servers.yaml``.)
+(This is ported from ``tests/cloud_tests/testcases/modules/ntp_servers.yaml``,
+``tests/cloud_tests/testcases/modules/ntp_pools.yaml``,
+and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``)
"""
import re
-import yaml
import pytest
+import yaml
+
+from tests.integration_tests.instances import IntegrationInstance
USER_DATA = """\
#cloud-config
@@ -17,21 +21,25 @@ ntp:
servers:
- 172.16.15.14
- 172.16.17.18
+ pools:
+ - 0.cloud-init.mypool
+ - 1.cloud-init.mypool
+ - 172.16.15.15
"""
EXPECTED_SERVERS = yaml.safe_load(USER_DATA)["ntp"]["servers"]
+EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"]
-@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestNtpServers:
-
- def test_ntp_installed(self, class_client):
+ def test_ntp_installed(self, class_client: IntegrationInstance):
"""Test that `ntpd --version` succeeds, indicating installation."""
- result = class_client.execute("ntpd --version")
- assert 0 == result.return_code
+ assert class_client.execute("ntpd --version").ok
- def test_dist_config_file_is_empty(self, class_client):
+ def test_dist_config_file_is_empty(
+ self, class_client: IntegrationInstance
+ ):
"""Test that the distributed config file is empty.
(This test is skipped on all currently supported Ubuntu releases, so
@@ -42,17 +50,79 @@ class TestNtpServers:
dist_file = class_client.read_from_file("/etc/ntp.conf.dist")
assert 0 == len(dist_file.strip().splitlines())
- def test_ntp_entries(self, class_client):
+ def test_ntp_entries(self, class_client: IntegrationInstance):
ntp_conf = class_client.read_from_file("/etc/ntp.conf")
for expected_server in EXPECTED_SERVERS:
assert re.search(
r"^server {} iburst".format(expected_server),
ntp_conf,
- re.MULTILINE
+ re.MULTILINE,
+ )
+ for expected_pool in EXPECTED_POOLS:
+ assert re.search(
+ r"^pool {} iburst".format(expected_pool),
+ ntp_conf,
+ re.MULTILINE,
)
- def test_ntpq_servers(self, class_client):
+ def test_ntpq_servers(self, class_client: IntegrationInstance):
result = class_client.execute("ntpq -p -w -n")
assert result.ok
- for expected_server in EXPECTED_SERVERS:
- assert expected_server in result.stdout
+ for expected_server_or_pool in [*EXPECTED_SERVERS, *EXPECTED_POOLS]:
+ assert expected_server_or_pool in result.stdout
+
+
+CHRONY_DATA = """\
+#cloud-config
+ntp:
+ enabled: true
+ ntp_client: chrony
+ servers:
+ - 172.16.15.14
+"""
+
+
+@pytest.mark.user_data(CHRONY_DATA)
+def test_chrony(client: IntegrationInstance):
+ if client.execute("test -f /etc/chrony.conf").ok:
+ chrony_conf = "/etc/chrony.conf"
+ else:
+ chrony_conf = "/etc/chrony/chrony.conf"
+ contents = client.read_from_file(chrony_conf)
+ assert "server 172.16.15.14" in contents
+
+
+TIMESYNCD_DATA = """\
+#cloud-config
+ntp:
+ enabled: true
+ ntp_client: systemd-timesyncd
+ servers:
+ - 172.16.15.14
+"""
+
+
+@pytest.mark.user_data(TIMESYNCD_DATA)
+def test_timesyncd(client: IntegrationInstance):
+ contents = client.read_from_file(
+ "/etc/systemd/timesyncd.conf.d/cloud-init.conf"
+ )
+ assert "NTP=172.16.15.14" in contents
+
+
+EMPTY_NTP = """\
+#cloud-config
+ntp:
+ ntp_client: ntp
+ pools: []
+ servers: []
+"""
+
+
+@pytest.mark.user_data(EMPTY_NTP)
+def test_empty_ntp(client: IntegrationInstance):
+ assert client.execute("ntpd --version").ok
+ assert client.execute("test -f /etc/ntp.conf.dist").failed
+ assert "pool.ntp.org iburst" in client.execute(
+ 'grep -v "^#" /etc/ntp.conf'
+ )
diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py
index 8a38ad84..d668d81c 100644
--- a/tests/integration_tests/modules/test_package_update_upgrade_install.py
+++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py
@@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as
"""
import re
-import pytest
+import pytest
USER_DATA = """\
#cloud-config
@@ -26,9 +26,9 @@ package_upgrade: true
"""
+@pytest.mark.ubuntu
@pytest.mark.user_data(USER_DATA)
class TestPackageUpdateUpgradeInstall:
-
def assert_package_installed(self, pkg_out, name, version=None):
"""Check dpkg-query --show output for matching package name.
@@ -37,7 +37,8 @@ class TestPackageUpdateUpgradeInstall:
version.
"""
pkg_match = re.search(
- "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE)
+ "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE
+ )
if pkg_match:
installed_version = pkg_match.group("version")
if not version:
@@ -45,8 +46,10 @@ class TestPackageUpdateUpgradeInstall:
if installed_version.startswith(version):
return # Success
raise AssertionError(
- "Expected package version %s-%s not found. Found %s" %
- name, version, installed_version)
+ "Expected package version %s-%s not found. Found %s" % name,
+ version,
+ installed_version,
+ )
raise AssertionError("Package not installed: %s" % name)
def test_new_packages_are_installed(self, class_client):
@@ -57,11 +60,13 @@ class TestPackageUpdateUpgradeInstall:
def test_packages_were_updated(self, class_client):
out = class_client.execute(
- "grep ^Commandline: /var/log/apt/history.log")
+ "grep ^Commandline: /var/log/apt/history.log"
+ )
assert (
"Commandline: /usr/bin/apt-get --option=Dpkg::Options"
"::=--force-confold --option=Dpkg::options::=--force-unsafe-io "
- "--assume-yes --quiet install sl tree") in out
+ "--assume-yes --quiet install sl tree" in out
+ )
def test_packages_were_upgraded(self, class_client):
"""Test cloud-init-output for install & upgrade stuff."""
diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py
new file mode 100644
index 00000000..33527e1e
--- /dev/null
+++ b/tests/integration_tests/modules/test_persistence.py
@@ -0,0 +1,32 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Test the behavior of loading/discarding pickle data"""
+from pathlib import Path
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import (
+ ASSETS_DIR,
+ verify_ordered_items_in_text,
+)
+
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl"
+
+
+@pytest.mark.lxd_container
+def test_log_message_on_missing_version_file(client: IntegrationInstance):
+ client.push_file(TEST_PICKLE, PICKLE_PATH)
+ client.restart()
+ assert client.execute("cloud-init status --wait").ok
+ log = client.read_from_file("/var/log/cloud-init.log")
+ verify_ordered_items_in_text(
+ [
+ "Unable to unpickle datasource: 'MIMEMultipart' object has no "
+ "attribute 'policy'. Ignoring current cache.",
+ "no cache found",
+ "Searching for local data source",
+ "SUCCESS: found local data from DataSourceNoCloud",
+ ],
+ log,
+ )
diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py
new file mode 100644
index 00000000..5cd19764
--- /dev/null
+++ b/tests/integration_tests/modules/test_power_state_change.py
@@ -0,0 +1,97 @@
+"""Integration test of the cc_power_state_change module.
+
+Test that the power state config options work as expected.
+"""
+
+import time
+
+import pytest
+
+from tests.integration_tests.clouds import IntegrationCloud
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import verify_ordered_items_in_text
+
+USER_DATA = """\
+#cloud-config
+power_state:
+ delay: {delay}
+ mode: {mode}
+ message: msg
+ timeout: {timeout}
+ condition: {condition}
+"""
+
+
+def _detect_reboot(instance: IntegrationInstance):
+ # We'll wait for instance up here, but we don't know if we're
+ # detecting the first boot or second boot, so we also check
+ # the logs to ensure we've booted twice. If the logs show we've
+ # only booted once, wait until we've booted twice
+ instance.instance.wait()
+ for _ in range(600):
+ try:
+ log = instance.read_from_file("/var/log/cloud-init.log")
+ boot_count = log.count("running 'init-local'")
+ if boot_count == 1:
+ instance.instance.wait()
+ elif boot_count > 1:
+ break
+ except Exception:
+ pass
+ time.sleep(1)
+ else:
+ raise Exception("Could not detect reboot")
+
+
+def _can_connect(instance):
+ return instance.execute("true").ok
+
+
+# This test is marked unstable because even though it should be able to
+# run anywhere, I can only get it to run in an lxd container, and even then
+# occasionally some timing issues will crop up.
+@pytest.mark.unstable
+@pytest.mark.ubuntu
+@pytest.mark.lxd_container
+class TestPowerChange:
+ @pytest.mark.parametrize(
+ "mode,delay,timeout,expected",
+ [
+ ("poweroff", "now", "10", "will execute: shutdown -P now msg"),
+ ("reboot", "now", "0", "will execute: shutdown -r now msg"),
+ ("halt", "+1", "0", "will execute: shutdown -H +1 msg"),
+ ],
+ )
+ def test_poweroff(
+ self, session_cloud: IntegrationCloud, mode, delay, timeout, expected
+ ):
+ with session_cloud.launch(
+ user_data=USER_DATA.format(
+ delay=delay, mode=mode, timeout=timeout, condition="true"
+ ),
+ launch_kwargs={"wait": False},
+ ) as instance:
+ if mode == "reboot":
+ _detect_reboot(instance)
+ else:
+ instance.instance.wait_for_stop()
+ instance.instance.start(wait=True)
+ log = instance.read_from_file("/var/log/cloud-init.log")
+ assert _can_connect(instance)
+ lines_to_check = [
+ "Running module power-state-change",
+ expected,
+ "running 'init-local'",
+ "config-power-state-change already ran",
+ ]
+ verify_ordered_items_in_text(lines_to_check, log)
+
+ @pytest.mark.user_data(
+ USER_DATA.format(
+ delay="0", mode="poweroff", timeout="0", condition="false"
+ )
+ )
+ def test_poweroff_false_condition(self, client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert _can_connect(client)
+ assert "Condition was false. Will not perform state change" in log
diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py
new file mode 100644
index 00000000..1bd9cee4
--- /dev/null
+++ b/tests/integration_tests/modules/test_puppet.py
@@ -0,0 +1,39 @@
+"""Test installation configuration of puppet module."""
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import verify_clean_log
+
+SERVICE_DATA = """\
+#cloud-config
+puppet:
+ install: true
+ install_type: packages
+"""
+
+
+@pytest.mark.user_data(SERVICE_DATA)
+def test_puppet_service(client: IntegrationInstance):
+ """Basic test that puppet gets installed and runs."""
+ log = client.read_from_file("/var/log/cloud-init.log")
+ verify_clean_log(log)
+ assert client.execute("systemctl is-active puppet").ok
+ assert "Running command ['puppet', 'agent'" not in log
+
+
+EXEC_DATA = """\
+#cloud-config
+puppet:
+ install: true
+ install_type: packages
+ exec: true
+ exec_args: ['--noop']
+"""
+
+
+@pytest.mark.user_data
+@pytest.mark.user_data(EXEC_DATA)
+def test_pupet_exec(client: IntegrationInstance):
+ """Basic test that puppet gets installed and runs."""
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Running command ['puppet', 'agent', '--noop']" in log
diff --git a/tests/integration_tests/modules/test_runcmd.py b/tests/integration_tests/modules/test_runcmd.py
deleted file mode 100644
index 50d1851e..00000000
--- a/tests/integration_tests/modules/test_runcmd.py
+++ /dev/null
@@ -1,25 +0,0 @@
-"""Integration test for the runcmd module.
-
-This test specifies a command to be executed by the ``runcmd`` module
-and then checks if that command was executed during boot.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/runcmd.yaml``.)"""
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-runcmd:
- - echo cloud-init run cmd test > /var/tmp/run_cmd
-"""
-
-
-@pytest.mark.ci
-class TestRuncmd:
-
- @pytest.mark.user_data(USER_DATA)
- def test_runcmd(self, client):
- runcmd_output = client.read_from_file("/var/tmp/run_cmd")
- assert runcmd_output.strip() == "cloud-init run cmd test"
diff --git a/tests/integration_tests/modules/test_seed_random_data.py b/tests/integration_tests/modules/test_seed_random_data.py
deleted file mode 100644
index b365fa98..00000000
--- a/tests/integration_tests/modules/test_seed_random_data.py
+++ /dev/null
@@ -1,28 +0,0 @@
-"""Integration test for the random seed module.
-
-This test specifies a command to be executed by the ``seed_random`` module, by
-providing a different data to be used as seed data. We will then check
-if that seed data was actually used.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/seed_random_data.yaml``.)"""
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-random_seed:
- data: 'MYUb34023nD:LFDK10913jk;dfnk:Df'
- encoding: raw
- file: /root/seed
-"""
-
-
-@pytest.mark.ci
-class TestSeedRandomData:
-
- @pytest.mark.user_data(USER_DATA)
- def test_seed_random_data(self, client):
- seed_output = client.read_from_file("/root/seed")
- assert seed_output.strip() == "MYUb34023nD:LFDK10913jk;dfnk:Df"
diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py
index 2bfa403d..ae0aeae9 100644
--- a/tests/integration_tests/modules/test_set_hostname.py
+++ b/tests/integration_tests/modules/test_set_hostname.py
@@ -11,7 +11,6 @@ after the system is boot.
import pytest
-
USER_DATA_HOSTNAME = """\
#cloud-config
hostname: cloudinit2
@@ -24,15 +23,31 @@ hostname: cloudinit1
fqdn: cloudinit2.i9n.cloud-init.io
"""
+USER_DATA_PREFER_FQDN = """\
+#cloud-config
+prefer_fqdn_over_hostname: {}
+hostname: cloudinit1
+fqdn: cloudinit2.test.io
+"""
+
@pytest.mark.ci
class TestHostname:
-
@pytest.mark.user_data(USER_DATA_HOSTNAME)
def test_hostname(self, client):
hostname_output = client.execute("hostname")
assert "cloudinit2" in hostname_output.strip()
+ @pytest.mark.user_data(USER_DATA_PREFER_FQDN.format(True))
+ def test_prefer_fqdn(self, client):
+ hostname_output = client.execute("hostname")
+ assert "cloudinit2.test.io" in hostname_output.strip()
+
+ @pytest.mark.user_data(USER_DATA_PREFER_FQDN.format(False))
+ def test_prefer_short_hostname(self, client):
+ hostname_output = client.execute("hostname")
+ assert "cloudinit1" in hostname_output.strip()
+
@pytest.mark.user_data(USER_DATA_FQDN)
def test_hostname_and_fqdn(self, client):
hostname_output = client.execute("hostname")
@@ -42,6 +57,8 @@ class TestHostname:
assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip()
host_output = client.execute("grep ^127 /etc/hosts")
- assert '127.0.1.1 {} {}'.format(
- fqdn_output, hostname_output) in host_output
- assert '127.0.0.1 localhost' in host_output
+ assert (
+ "127.0.1.1 {} {}".format(fqdn_output, hostname_output)
+ in host_output
+ )
+ assert "127.0.0.1 localhost" in host_output
diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py
index b13f76fb..0e35cd26 100644
--- a/tests/integration_tests/modules/test_set_password.py
+++ b/tests/integration_tests/modules/test_set_password.py
@@ -8,11 +8,10 @@ other tests chpasswd's list being a string. Both expect the same results, so
they use a mixin to share their test definitions, because we can (of course)
only specify one user-data per instance.
"""
-import crypt
-
import pytest
import yaml
+from tests.integration_tests.util import retry
COMMON_USER_DATA = """\
#cloud-config
@@ -40,7 +39,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/"
lock_passwd: false
"""
-LIST_USER_DATA = COMMON_USER_DATA + """
+LIST_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list:
- tom:mypassword123!
@@ -48,8 +49,11 @@ chpasswd:
- harry:RANDOM
- mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
-STRING_USER_DATA = COMMON_USER_DATA + """
+STRING_USER_DATA = (
+ COMMON_USER_DATA
+ + """
chpasswd:
list: |
tom:mypassword123!
@@ -57,6 +61,7 @@ chpasswd:
harry:RANDOM
mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89
"""
+)
USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"]
USERS_PASSWD_VALUES = {
@@ -116,14 +121,52 @@ class Mixin:
# Which are not the same
assert shadow_users["harry"] != shadow_users["dick"]
+ def test_random_passwords_not_stored_in_cloud_init_output_log(
+ self, class_client
+ ):
+ """We should not emit passwords to the in-instance log file.
+
+ LP: #1918303
+ """
+ cloud_init_output = class_client.read_from_file(
+ "/var/log/cloud-init-output.log"
+ )
+ assert "dick:" not in cloud_init_output
+ assert "harry:" not in cloud_init_output
+
+ @retry(tries=30, delay=1)
+ def test_random_passwords_emitted_to_serial_console(self, class_client):
+ """We should emit passwords to the serial console. (LP: #1918303)"""
+ try:
+ console_log = class_client.instance.console_log()
+ except NotImplementedError:
+ # Assume that an exception here means that we can't use the console
+ # log
+ pytest.skip("NotImplementedError when requesting console log")
+ return
+ if console_log.lower() == "no console output":
+ # This test retries because we might not have the full console log
+ # on the first fetch. However, if we have no console output
+ # at all, we don't want to keep retrying as that would trigger
+ # another 5 minute wait on the pycloudlib side, which could
+ # leave us waiting for a couple hours
+ pytest.fail("no console output")
+ return
+ assert "dick:" in console_log
+ assert "harry:" in console_log
+
def test_explicit_password_set_correctly(self, class_client):
"""Test that an explicitly-specified password is set correctly."""
shadow_users, _ = self._fetch_and_parse_etc_shadow(class_client)
fmt_and_salt = shadow_users["tom"].rsplit("$", 1)[0]
- expected_value = crypt.crypt("mypassword123!", fmt_and_salt)
-
- assert expected_value == shadow_users["tom"]
+ GEN_CRYPT_CONTENT = (
+ "import crypt\n"
+ f"print(crypt.crypt('mypassword123!', '{fmt_and_salt}'))\n"
+ )
+ class_client.write_to_file("/gen_crypt.py", GEN_CRYPT_CONTENT)
+ result = class_client.execute("python3 /gen_crypt.py")
+ assert result.stdout == shadow_users["tom"]
def test_shadow_expected_users(self, class_client):
"""Test that the right set of users is in /etc/shadow."""
diff --git a/tests/integration_tests/modules/test_snap.py b/tests/integration_tests/modules/test_snap.py
deleted file mode 100644
index b626f6b0..00000000
--- a/tests/integration_tests/modules/test_snap.py
+++ /dev/null
@@ -1,29 +0,0 @@
-"""Integration test for the snap module.
-
-This test specifies a command to be executed by the ``snap`` module
-and then checks that if that command was executed during boot.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/runcmd.yaml``.)"""
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-package_update: true
-snap:
- squashfuse_in_container: true
- commands:
- - snap install hello-world
-"""
-
-
-@pytest.mark.ci
-class TestSnap:
-
- @pytest.mark.user_data(USER_DATA)
- def test_snap(self, client):
- snap_output = client.execute("snap list")
- assert "core " in snap_output
- assert "hello-world " in snap_output
diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
index b9b0d85e..89b49576 100644
--- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
+++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py
@@ -12,13 +12,14 @@ import re
import pytest
+from tests.integration_tests.util import retry
USER_DATA_SSH_AUTHKEY_DISABLE = """\
#cloud-config
no_ssh_fingerprints: true
"""
-USER_DATA_SSH_AUTHKEY_ENABLE="""\
+USER_DATA_SSH_AUTHKEY_ENABLE = """\
#cloud-config
ssh_genkeytypes:
- ecdsa
@@ -30,19 +31,22 @@ ssh_authorized_keys:
@pytest.mark.ci
class TestSshAuthkeyFingerprints:
-
@pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE)
def test_ssh_authkey_fingerprints_disable(self, client):
cloudinit_output = client.read_from_file("/var/log/cloud-init.log")
assert (
"Skipping module named ssh-authkey-fingerprints, "
- "logging of SSH fingerprints disabled") in cloudinit_output
+ "logging of SSH fingerprints disabled" in cloudinit_output
+ )
+ # retry decorator here because it can take some time to be reflected
+ # in syslog
+ @retry(tries=30, delay=1)
@pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_ENABLE)
def test_ssh_authkey_fingerprints_enable(self, client):
syslog_output = client.read_from_file("/var/log/syslog")
- assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None
- assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None
- assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None
- assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None
+ assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None
+ assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None
+ assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None
+ assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None
diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py
index 60c36982..1dd0adf1 100644
--- a/tests/integration_tests/modules/test_ssh_generate.py
+++ b/tests/integration_tests/modules/test_ssh_generate.py
@@ -10,7 +10,6 @@ keys were created.
import pytest
-
USER_DATA = """\
#cloud-config
ssh_genkeytypes:
@@ -23,28 +22,27 @@ authkey_hash: sha512
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysGenerate:
-
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_dsa_key.pub",
"/etc/ssh/ssh_host_dsa_key",
"/etc/ssh/ssh_host_rsa_key.pub",
"/etc/ssh/ssh_host_rsa_key",
- )
+ ),
)
def test_ssh_keys_not_generated(self, ssh_key_path, class_client):
- out = class_client.execute(
- "test -e {}".format(ssh_key_path)
- )
+ out = class_client.execute("test -e {}".format(ssh_key_path))
assert out.failed
@pytest.mark.parametrize(
- "ssh_key_path", (
+ "ssh_key_path",
+ (
"/etc/ssh/ssh_host_ecdsa_key.pub",
"/etc/ssh/ssh_host_ecdsa_key",
"/etc/ssh/ssh_host_ed25519_key.pub",
"/etc/ssh/ssh_host_ed25519_key",
- )
+ ),
)
def test_ssh_keys_generated(self, ssh_key_path, class_client):
out = class_client.read_from_file(ssh_key_path)
diff --git a/tests/integration_tests/modules/test_ssh_import_id.py b/tests/integration_tests/modules/test_ssh_import_id.py
deleted file mode 100644
index 45d37d6c..00000000
--- a/tests/integration_tests/modules/test_ssh_import_id.py
+++ /dev/null
@@ -1,29 +0,0 @@
-"""Integration test for the ssh_import_id module.
-
-This test specifies ssh keys to be imported by the ``ssh_import_id`` module
-and then checks that if the ssh keys were successfully imported.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/ssh_import_id.yaml``.)"""
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-ssh_import_id:
- - gh:powersj
- - lp:smoser
-"""
-
-
-@pytest.mark.ci
-class TestSshImportId:
-
- @pytest.mark.user_data(USER_DATA)
- def test_ssh_import_id(self, client):
- ssh_output = client.read_from_file(
- "/home/ubuntu/.ssh/authorized_keys")
-
- assert '# ssh-import-id gh:powersj' in ssh_output
- assert '# ssh-import-id lp:smoser' in ssh_output
diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py
index 27d193c1..b79f18eb 100644
--- a/tests/integration_tests/modules/test_ssh_keys_provided.py
+++ b/tests/integration_tests/modules/test_ssh_keys_provided.py
@@ -9,7 +9,6 @@ system.
import pytest
-
USER_DATA = """\
#cloud-config
disable_root: false
@@ -82,67 +81,60 @@ ssh_keys:
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestSshKeysProvided:
-
- def test_ssh_dsa_keys_provided(self, class_client):
- """Test dsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key.pub")
- assert (
- "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
- "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM") in out
-
- """Test dsa private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key")
- assert (
- "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
- "hOVAfzZ6+jklP") in out
-
- def test_ssh_rsa_keys_provided(self, class_client):
- """Test rsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key.pub")
- assert (
- "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
- "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4") in out
-
- """Test rsa private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key")
- assert (
- "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
- "RQvLZpMRdywBm") in out
-
- def test_ssh_rsa_certificate_provided(self, class_client):
- """Test rsa certificate was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key-cert.pub")
- assert (
- "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
- "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD") in out
-
- def test_ssh_certificate_updated_sshd_config(self, class_client):
- """Test ssh certificate was added to /etc/ssh/sshd_config."""
- out = class_client.read_from_file("/etc/ssh/sshd_config").strip()
- assert "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub" in out
-
- def test_ssh_ecdsa_keys_provided(self, class_client):
- """Test ecdsa public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key.pub")
- assert (
- "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
- "BBFsS5Tvky/IC/dXhE/afxxU") in out
-
- """Test ecdsa private key generated."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key")
- assert (
- "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
- "5mpZqxgX4vcgb") in out
-
- def test_ssh_ed25519_keys_provided(self, class_client):
- """Test ed25519 public key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key.pub")
- assert (
- "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
- "G15dqjQ2XkNVOEnb5") in out
-
- """Test ed25519 private key was imported."""
- out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key")
- assert (
- "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
- "OhteXao0Nl5DVThJ2+Q") in out
+ @pytest.mark.parametrize(
+ "config_path,expected_out",
+ (
+ (
+ "/etc/ssh/ssh_host_dsa_key.pub",
+ "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R"
+ "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM",
+ ),
+ (
+ "/etc/ssh/ssh_host_dsa_key",
+ "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr"
+ "hOVAfzZ6+jklP",
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key.pub",
+ "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT"
+ "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4",
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key",
+ "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un"
+ "RQvLZpMRdywBm",
+ ),
+ (
+ "/etc/ssh/ssh_host_rsa_key-cert.pub",
+ "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg"
+ "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD",
+ ),
+ (
+ "/etc/ssh/sshd_config",
+ "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub",
+ ),
+ (
+ "/etc/ssh/ssh_host_ecdsa_key.pub",
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB"
+ "BBFsS5Tvky/IC/dXhE/afxxU",
+ ),
+ (
+ "/etc/ssh/ssh_host_ecdsa_key",
+ "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY"
+ "5mpZqxgX4vcgb",
+ ),
+ (
+ "/etc/ssh/ssh_host_ed25519_key.pub",
+ "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6"
+ "G15dqjQ2XkNVOEnb5",
+ ),
+ (
+ "/etc/ssh/ssh_host_ed25519_key",
+ "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT"
+ "OhteXao0Nl5DVThJ2+Q",
+ ),
+ ),
+ )
+ def test_ssh_provided_keys(self, config_path, expected_out, class_client):
+ out = class_client.read_from_file(config_path).strip()
+ assert expected_out in out
diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py
new file mode 100644
index 00000000..8330a1ce
--- /dev/null
+++ b/tests/integration_tests/modules/test_ssh_keysfile.py
@@ -0,0 +1,224 @@
+from io import StringIO
+
+import paramiko
+import pytest
+from paramiko.ssh_exception import SSHException
+
+from tests.integration_tests.clouds import ImageSpecification
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import get_test_rsa_keypair
+
+TEST_USER1_KEYS = get_test_rsa_keypair("test1")
+TEST_USER2_KEYS = get_test_rsa_keypair("test2")
+TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3")
+
+_USERDATA = """\
+#cloud-config
+bootcmd:
+ - {bootcmd}
+ssh_authorized_keys:
+ - {default}
+users:
+- default
+- name: test_user1
+ ssh_authorized_keys:
+ - {user1}
+- name: test_user2
+ ssh_authorized_keys:
+ - {user2}
+""".format(
+ bootcmd="{bootcmd}",
+ default=TEST_DEFAULT_KEYS.public_key,
+ user1=TEST_USER1_KEYS.public_key,
+ user2=TEST_USER2_KEYS.public_key,
+)
+
+
+def common_verify(client, expected_keys):
+ for user, filename, keys in expected_keys:
+ # Ensure key is in the key file
+ contents = client.read_from_file(filename)
+ if user in ["ubuntu", "root"]:
+ lines = contents.split("\n")
+ if user == "root":
+ # Our personal public key gets added by pycloudlib in
+ # addition to the default `ssh_authorized_keys`
+ assert len(lines) == 2
+ else:
+ # Clouds will insert the keys we've added to our accounts
+ # or for our launches
+ assert len(lines) >= 2
+ assert keys.public_key.strip() in contents
+ else:
+ assert contents.strip() == keys.public_key.strip()
+
+ # Ensure we can actually connect
+ ssh = paramiko.SSHClient()
+ ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ paramiko_key = paramiko.RSAKey.from_private_key(
+ StringIO(keys.private_key)
+ )
+
+ # Will fail with AuthenticationException if
+ # we cannot connect
+ ssh.connect(
+ client.instance.ip,
+ username=user,
+ pkey=paramiko_key,
+ look_for_keys=False,
+ allow_agent=False,
+ )
+
+ # Ensure other uses can't connect using our key
+ other_users = [u[0] for u in expected_keys if u[2] != keys]
+ for other_user in other_users:
+ with pytest.raises(SSHException):
+ print(
+ "trying to connect as {} with key from {}".format(
+ other_user, user
+ )
+ )
+ ssh.connect(
+ client.instance.ip,
+ username=other_user,
+ pkey=paramiko_key,
+ look_for_keys=False,
+ allow_agent=False,
+ )
+
+ # Ensure we haven't messed with any /home permissions
+ # See LP: #1940233
+ home_dir = "/home/{}".format(user)
+ # Home permissions aren't consistent between releases. On ubuntu
+ # this can change to 750 once focal is unsupported.
+ if ImageSpecification.from_os_image().release in ("bionic", "focal"):
+ home_perms = "755"
+ else:
+ home_perms = "750"
+ if user == "root":
+ home_dir = "/root"
+ home_perms = "700"
+ assert "{} {}".format(user, home_perms) == client.execute(
+ 'stat -c "%U %a" {}'.format(home_dir)
+ )
+ if client.execute("test -d {}/.ssh".format(home_dir)).ok:
+ assert "{} 700".format(user) == client.execute(
+ 'stat -c "%U %a" {}/.ssh'.format(home_dir)
+ )
+ assert "{} 600".format(user) == client.execute(
+ 'stat -c "%U %a" {}'.format(filename)
+ )
+
+ # Also ensure ssh-keygen works as expected
+ client.execute("mkdir {}/.ssh".format(home_dir))
+ assert client.execute(
+ "ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format(
+ home_dir
+ )
+ ).ok
+ assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir))
+ assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir))
+
+ assert "root 755" == client.execute('stat -c "%U %a" /home')
+
+
+DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""')
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(DEFAULT_KEYS_USERDATA)
+def test_authorized_keys_default(client: IntegrationInstance):
+ expected_keys = [
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS),
+ ]
+ common_verify(client, expected_keys)
+
+
+AUTHORIZED_KEYS2_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' "
+ "/etc/ssh/sshd_config"
+ )
+)
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA)
+def test_authorized_keys2(client: IntegrationInstance):
+ expected_keys = [
+ (
+ "test_user1",
+ "/home/test_user1/.ssh/authorized_keys2",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/home/test_user2/.ssh/authorized_keys2",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
+ ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS),
+ ]
+ common_verify(client, expected_keys)
+
+
+NESTED_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(NESTED_KEYS_USERDATA)
+def test_nested_keys(client: IntegrationInstance):
+ expected_keys = [
+ ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS),
+ ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS),
+ ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
+ ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS),
+ ]
+ common_verify(client, expected_keys)
+
+
+EXTERNAL_KEYS_USERDATA = _USERDATA.format(
+ bootcmd=(
+ "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile "
+ "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' "
+ "/etc/ssh/sshd_config"
+ )
+)
+
+
+@pytest.mark.ubuntu
+@pytest.mark.user_data(EXTERNAL_KEYS_USERDATA)
+def test_external_keys(client: IntegrationInstance):
+ expected_keys = [
+ (
+ "test_user1",
+ "/etc/ssh/authorized_keys/test_user1/keys",
+ TEST_USER1_KEYS,
+ ),
+ (
+ "test_user2",
+ "/etc/ssh/authorized_keys/test_user2/keys",
+ TEST_USER2_KEYS,
+ ),
+ ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS),
+ ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS),
+ ]
+ common_verify(client, expected_keys)
diff --git a/tests/integration_tests/modules/test_timezone.py b/tests/integration_tests/modules/test_timezone.py
deleted file mode 100644
index 111d53f7..00000000
--- a/tests/integration_tests/modules/test_timezone.py
+++ /dev/null
@@ -1,25 +0,0 @@
-"""Integration test for the timezone module.
-
-This test specifies a timezone to be used by the ``timezone`` module
-and then checks that if that timezone was respected during boot.
-
-(This is ported from
-``tests/cloud_tests/testcases/modules/timezone.yaml``.)"""
-
-import pytest
-
-
-USER_DATA = """\
-#cloud-config
-timezone: US/Aleutian
-"""
-
-
-@pytest.mark.ci
-class TestTimezone:
-
- @pytest.mark.user_data(USER_DATA)
- def test_timezone(self, client):
- timezone_output = client.execute(
- 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"')
- assert timezone_output.strip() == "HDT"
diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py
new file mode 100644
index 00000000..e4a4241f
--- /dev/null
+++ b/tests/integration_tests/modules/test_user_events.py
@@ -0,0 +1,110 @@
+"""Test user-overridable events.
+
+This is currently limited to applying network config on BOOT events.
+"""
+
+import re
+
+import pytest
+import yaml
+
+from tests.integration_tests.instances import IntegrationInstance
+
+
+def _add_dummy_bridge_to_netplan(client: IntegrationInstance):
+ # Update netplan configuration to ensure it doesn't change on reboot
+ netplan = yaml.safe_load(
+ client.execute("cat /etc/netplan/50-cloud-init.yaml")
+ )
+ # Just a dummy bridge to do nothing
+ try:
+ netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False}
+ except KeyError:
+ netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}}
+
+ dumped_netplan = yaml.dump(netplan)
+ client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan)
+
+
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+@pytest.mark.ec2
+@pytest.mark.gce
+@pytest.mark.oci
+@pytest.mark.openstack
+def test_boot_event_disabled_by_default(client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
+ pytest.skip("network config disabled. Test doesn't apply")
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
+
+ _add_dummy_bridge_to_netplan(client)
+ client.execute("rm /var/log/cloud-init.log")
+
+ client.restart()
+ log2 = client.read_from_file("/var/log/cloud-init.log")
+
+ if "cache invalid in datasource" in log2:
+ # Invalid cache will get cleared, meaning we'll create a new
+ # "instance" and apply networking config, so events aren't
+ # really relevant here
+ pytest.skip("Test only valid for existing instances")
+
+ # We attempt to apply network config twice on every boot.
+ # Ensure neither time works.
+ assert 2 == len(
+ re.findall(
+ r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2
+ )
+ )
+ assert 2 == log2.count(
+ "Event Denied: scopes=['network'] EventType=boot-legacy"
+ )
+ assert 2 == log2.count(
+ "No network config applied. Neither a new instance"
+ " nor datasource network update allowed"
+ )
+
+ assert "dummy0" in client.execute("ls /sys/class/net")
+
+
+def _test_network_config_applied_on_reboot(client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "network config is disabled" in log:
+ pytest.skip("network config disabled. Test doesn't apply")
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
+
+ _add_dummy_bridge_to_netplan(client)
+ client.execute('echo "" > /var/log/cloud-init.log')
+ client.restart()
+
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "cache invalid in datasource" in log:
+ # Invalid cache will get cleared, meaning we'll create a new
+ # "instance" and apply networking config, so events aren't
+ # really relevant here
+ pytest.skip("Test only valid for existing instances")
+
+ assert "Event Allowed: scope=network EventType=boot" in log
+ assert "Applying network configuration" in log
+ assert "dummy0" not in client.execute("ls /sys/class/net")
+
+
+@pytest.mark.azure
+def test_boot_event_enabled_by_default(client: IntegrationInstance):
+ _test_network_config_applied_on_reboot(client)
+
+
+USER_DATA = """\
+#cloud-config
+updates:
+ network:
+ when: [boot]
+"""
+
+
+@pytest.mark.user_data(USER_DATA)
+def test_boot_event_enabled(client: IntegrationInstance):
+ _test_network_config_applied_on_reboot(client)
diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py
index 6a51f5a6..fddff681 100644
--- a/tests/integration_tests/modules/test_users_groups.py
+++ b/tests/integration_tests/modules/test_users_groups.py
@@ -1,12 +1,15 @@
-"""Integration test for the user_groups module.
+"""Integration tests for the user_groups module.
-This test specifies a number of users and groups via user-data, and confirms
-that they have been configured correctly in the system under test.
+TODO:
+* This module assumes that the "ubuntu" user will be created when "default" is
+ specified; this will need modification to run on other OSes.
"""
import re
import pytest
+from tests.integration_tests.clouds import ImageSpecification
+from tests.integration_tests.instances import IntegrationInstance
USER_DATA = """\
#cloud-config
@@ -41,6 +44,13 @@ AHWYPYb2FT.lbioDm2RrkJPb9BZMN1O/
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestUsersGroups:
+ """Test users and groups.
+
+ This test specifies a number of users and groups via user-data, and
+ confirms that they have been configured correctly in the system under test.
+ """
+
+ @pytest.mark.ubuntu
@pytest.mark.parametrize(
"getent_args,regex",
[
@@ -73,7 +83,9 @@ class TestUsersGroups:
assert re.search(regex, result.stdout) is not None, (
"'getent {}' resulted in '{}', "
"but expected to match regex {}".format(
- ' '.join(getent_args), result.stdout, regex))
+ " ".join(getent_args), result.stdout, regex
+ )
+ )
def test_user_root_in_secret(self, class_client):
"""Test root user is in 'secret' group."""
@@ -81,3 +93,33 @@ class TestUsersGroups:
_, groups_str = output.split(":", maxsplit=1)
groups = groups_str.split()
assert "secret" in groups
+
+
+@pytest.mark.user_data(USER_DATA)
+def test_sudoers_includedir(client: IntegrationInstance):
+ """Ensure we don't add additional #includedir to sudoers.
+
+ Newer versions of /etc/sudoers will use @includedir rather than
+ #includedir. Ensure we handle that properly and don't include an
+ additional #includedir when one isn't warranted.
+
+ https://github.com/canonical/cloud-init/pull/783
+ """
+ if ImageSpecification.from_os_image().release in [
+ "bionic",
+ "focal",
+ ]:
+ raise pytest.skip(
+ "Test requires version of sudo installed on groovy and later"
+ )
+ client.execute("sed -i 's/#include/@include/g' /etc/sudoers")
+
+ sudoers = client.read_from_file("/etc/sudoers")
+ if "@includedir /etc/sudoers.d" not in sudoers:
+ client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers")
+ client.instance.clean()
+ client.restart()
+ sudoers = client.read_from_file("/etc/sudoers")
+
+ assert "#includedir" not in sudoers
+ assert sudoers.count("includedir /etc/sudoers.d") == 1
diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py
new file mode 100644
index 00000000..3168cd60
--- /dev/null
+++ b/tests/integration_tests/modules/test_version_change.py
@@ -0,0 +1,76 @@
+from pathlib import Path
+
+import pytest
+
+from tests.integration_tests.instances import IntegrationInstance
+from tests.integration_tests.util import ASSETS_DIR, verify_clean_log
+
+PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl")
+TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl"
+
+
+def _assert_no_pickle_problems(log):
+ assert "Failed loading pickled blob" not in log
+ verify_clean_log(log)
+
+
+def test_reboot_without_version_change(client: IntegrationInstance):
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Cache compatibility status is currently unknown." not in log
+ _assert_no_pickle_problems(log)
+
+ client.restart()
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected" not in log
+ assert "Could not determine Python version used to write cache" not in log
+ _assert_no_pickle_problems(log)
+
+ # Now ensure that loading a bad pickle gives us problems
+ client.push_file(TEST_PICKLE, PICKLE_PATH)
+ client.restart()
+ log = client.read_from_file("/var/log/cloud-init.log")
+
+ # no cache found is an "expected" upgrade error, and
+ # "Failed" means we're unable to load the pickle
+ assert any(
+ [
+ "Failed loading pickled blob from {}".format(PICKLE_PATH) in log,
+ "no cache found" in log,
+ ]
+ )
+
+
+@pytest.mark.ec2
+@pytest.mark.gce
+@pytest.mark.oci
+@pytest.mark.openstack
+@pytest.mark.lxd_container
+@pytest.mark.lxd_vm
+# No Azure because the cache gets purged every reboot, so we'll never
+# get to the point where we need to purge cache due to version change
+def test_cache_purged_on_version_change(client: IntegrationInstance):
+ # Start by pushing the invalid pickle so we'll hit an error if the
+ # cache didn't actually get purged
+ client.push_file(TEST_PICKLE, PICKLE_PATH)
+ client.execute("echo '1.0' > /var/lib/cloud/data/python-version")
+ client.restart()
+ log = client.read_from_file("/var/log/cloud-init.log")
+ assert "Python version change detected. Purging cache" in log
+ _assert_no_pickle_problems(log)
+
+
+def test_log_message_on_missing_version_file(client: IntegrationInstance):
+ # Start by pushing a pickle so we can see the log message
+ client.push_file(TEST_PICKLE, PICKLE_PATH)
+ client.execute("rm /var/lib/cloud/data/python-version")
+ client.execute("rm /var/log/cloud-init.log")
+ client.restart()
+ log = client.read_from_file("/var/log/cloud-init.log")
+ if "no cache found" not in log:
+ # We don't expect the python version file to exist if we have no
+ # pre-existing cache
+ assert (
+ "Writing python-version file. "
+ "Cache compatibility status is currently unknown." in log
+ )
diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py
index 15832ae3..1eb7e945 100644
--- a/tests/integration_tests/modules/test_write_files.py
+++ b/tests/integration_tests/modules/test_write_files.py
@@ -7,8 +7,8 @@ and then checks if those files were created during boot.
``tests/cloud_tests/testcases/modules/write_files.yaml``.)"""
import base64
-import pytest
+import pytest
ASCII_TEXT = "ASCII text"
B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8"))
@@ -21,6 +21,9 @@ B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8"))
#
USER_DATA = """\
#cloud-config
+users:
+- default
+- name: myuser
write_files:
- encoding: b64
content: {}
@@ -41,26 +44,50 @@ write_files:
H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
path: /root/file_gzip
permissions: '0755'
-""".format(B64_CONTENT.decode("ascii"))
+- path: '/home/testuser/my-file'
+ content: |
+ echo 'hello world!'
+ defer: true
+ owner: 'myuser'
+ permissions: '0644'
+""".format(
+ B64_CONTENT.decode("ascii")
+)
@pytest.mark.ci
@pytest.mark.user_data(USER_DATA)
class TestWriteFiles:
-
@pytest.mark.parametrize(
- "cmd,expected_out", (
+ "cmd,expected_out",
+ (
("file /root/file_b64", ASCII_TEXT),
("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"),
- ("sha256sum </root/file_binary", (
+ (
+ "sha256sum </root/file_binary",
"2c791c4037ea5bd7e928d6a87380f8ba"
- "7a803cd83d5e4f269e28f5090f0f2c9a"
- )),
- ("file /root/file_gzip",
- "POSIX shell script, ASCII text executable"),
+ "7a803cd83d5e4f269e28f5090f0f2c9a",
+ ),
+ (
+ "file /root/file_gzip",
+ "POSIX shell script, ASCII text executable",
+ ),
("file /root/file_text", ASCII_TEXT),
- )
+ ),
)
def test_write_files(self, cmd, expected_out, class_client):
out = class_client.execute(cmd)
assert expected_out in out
+
+ def test_write_files_deferred(self, class_client):
+ """Test that write files deferred works as expected.
+
+ Users get created after write_files module runs, so ensure that
+ with `defer: true`, the file gets written with correct ownership.
+ """
+ out = class_client.read_from_file("/home/testuser/my-file")
+ assert "echo 'hello world!'" == out
+ assert (
+ class_client.execute('stat -c "%U %a" /home/testuser/my-file')
+ == "myuser 644"
+ )