diff options
Diffstat (limited to 'tests/integration_tests/modules')
33 files changed, 2393 insertions, 315 deletions
diff --git a/tests/integration_tests/modules/test_apt.py b/tests/integration_tests/modules/test_apt.py new file mode 100644 index 00000000..adab46a8 --- /dev/null +++ b/tests/integration_tests/modules/test_apt.py @@ -0,0 +1,354 @@ +"""Series of integration tests covering apt functionality.""" +import re + +import pytest + +from cloudinit import gpg +from cloudinit.config import cc_apt_configure +from tests.integration_tests.clouds import ImageSpecification +from tests.integration_tests.instances import IntegrationInstance + +USER_DATA = """\ +#cloud-config +apt: + conf: | + APT { + Get { + Assume-Yes "true"; + Fix-Broken "true"; + } + } + primary: + - arches: [default] + uri: http://badarchive.ubuntu.com/ubuntu + security: + - arches: [default] + uri: http://badsecurity.ubuntu.com/ubuntu + sources_list: | + deb $MIRROR $RELEASE main restricted + deb-src $MIRROR $RELEASE main restricted + deb $PRIMARY $RELEASE universe restricted + deb-src $PRIMARY $RELEASE universe restricted + deb $SECURITY $RELEASE-security multiverse + deb-src $SECURITY $RELEASE-security multiverse + sources: + test_keyserver: + keyid: 110E21D8B0E2A1F0243AF6820856F197B892ACEA + keyserver: keyserver.ubuntu.com + source: "deb http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu $RELEASE main" + test_ppa: + keyid: 441614D8 + keyserver: keyserver.ubuntu.com + source: "ppa:simplestreams-dev/trunk" + test_signed_by: + keyid: A2EB2DEC0BD7519B7B38BE38376A290EC8068B11 + keyserver: keyserver.ubuntu.com + source: "deb [signed-by=$KEY_FILE] http://ppa.launchpad.net/juju/stable/ubuntu $RELEASE main" + test_bad_key: + key: "" + source: "deb $MIRROR $RELEASE main" + test_key: + source: "deb http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu $RELEASE main" + key: | + -----BEGIN PGP PUBLIC KEY BLOCK----- + Version: SKS 1.1.6 + Comment: Hostname: keyserver.ubuntu.com + + mQINBFbZRUIBEAC+A0PIKYBP9kLC4hQtRrffRS11uLo8/BdtmOdrlW0hpPHzCfKnjR3tvSEI + lqPHG1QrrjAXKZDnZMRz+h/px7lUztvytGzHPSJd5ARUzAyjyRezUhoJ3VSCxrPqx62avuWf + RfoJaIeHfDehL5/dTVkyiWxfVZ369ZX6JN2AgLsQTeybTQ75+2z0xPrrhnGmgh6g0qTYcAaq + M5ONOGiqeSBX/Smjh6ALy5XkhUiFGLsI7Yluf6XSICY/x7gd6RAfgSIQrUTNMoS1sqhT4aot + +xvOfQy8ySkfAK4NddXql6E/+ZqTmBY/Lr0YklFBy8jGT+UysfiIznPMIwbmgq5Li7BtDDtX + b8Uyi4edPpjtextezfXYn4NVIpPL5dPZS/FXh4HpzyH0pYCfrH4QDGA7i52AGmhpiOFjJMo6 + N33sdjZHOH/2Vyp+QZaQnsdUAi1N4M6c33tQbpIScn1SY+El8z5JDA4PBzkw8HpLCi1gGoa6 + V4kfbWqXXbGAJFkLkP/vc4+pY9axOlmCkJg7xCPwhI75y1cONgovhz+BEXOzolh5KZuGbGbj + xe0wva5DLBeIg7EQFf+99pOS7Syby3Xpm6ZbswEFV0cllK4jf/QMjtfInxobuMoI0GV0bE5l + WlRtPCK5FnbHwxi0wPNzB/5fwzJ77r6HgPrR0OkT0lWmbUyoOQARAQABtC1MYXVuY2hwYWQg + UFBBIGZvciBjbG91ZCBpbml0IGRldmVsb3BtZW50IHRlYW2JAjgEEwECACIFAlbZRUICGwMG + CwkIBwMCBhUIAgkKCwQWAgMBAh4BAheAAAoJEAg9Bvvk0wTfHfcP/REK5N2s1JYc69qEa9ZN + o6oi+A7l6AYw+ZY88O5TJe7F9otv5VXCIKSUT0Vsepjgf0mtXAgf/sb2lsJn/jp7tzgov3YH + vSrkTkRydz8xcA87gwQKePuvTLxQpftF4flrBxgSueIn5O/tPrBOxLz7EVYBc78SKg9aj9L2 + yUp+YuNevlwfZCTYeBb9r3FHaab2HcgkwqYch66+nKYfwiLuQ9NzXXm0Wn0JcEQ6pWvJscbj + C9BdawWovfvMK5/YLfI6Btm7F4mIpQBdhSOUp/YXKmdvHpmwxMCN2QhqYK49SM7qE9aUDbJL + arppSEBtlCLWhRBZYLTUna+BkuQ1bHz4St++XTR49Qd7vDERALpApDjB2dxPfMiBzCMwQQyq + uy13exU8o2ETLg+dZSLfDTzrBNsBFmXlw8WW17nTISYdKeGKL+QdlUjpzdwUMMzHhAO8SmMH + zjeSlDSRMXBJFAFSbCl7EwmMKa3yVX0zInT91fNllZ3iatAmtVdqVH/BFQfTIMH2ET7A8WzJ + ZzVSuMRhqoKdr5AMcHuJGPUoVkVJHQA+NNvEiXSysF3faL7jmKapmUwrhpYYX2H8pf+VMu2e + cLflKTI28dl+ZQ4Pl/aVsxrti/pzhdYy05Sn5ddtySyIkvo8L1cU5MWpbvSlFPkTstBUDLBf + pb0uBy+g0oxJQg15 + =uy53 + -----END PGP PUBLIC KEY BLOCK----- +apt_pipelining: os +""" # noqa: E501 + +EXPECTED_REGEXES = [ + r"deb http://badarchive.ubuntu.com/ubuntu [a-z]+ main restricted", + r"deb-src http://badarchive.ubuntu.com/ubuntu [a-z]+ main restricted", + r"deb http://badarchive.ubuntu.com/ubuntu [a-z]+ universe restricted", + r"deb-src http://badarchive.ubuntu.com/ubuntu [a-z]+ universe restricted", + r"deb http://badsecurity.ubuntu.com/ubuntu [a-z]+-security multiverse", + r"deb-src http://badsecurity.ubuntu.com/ubuntu [a-z]+-security multiverse", +] + +TEST_KEYSERVER_KEY = "110E 21D8 B0E2 A1F0 243A F682 0856 F197 B892 ACEA" +TEST_PPA_KEY = "3552 C902 B4DD F7BD 3842 1821 015D 28D7 4416 14D8" +TEST_KEY = "1FF0 D853 5EF7 E719 E5C8 1B9C 083D 06FB E4D3 04DF" +TEST_SIGNED_BY_KEY = "A2EB 2DEC 0BD7 519B 7B38 BE38 376A 290E C806 8B11" + + +@pytest.mark.ubuntu +@pytest.mark.user_data(USER_DATA) +class TestApt: + def get_keys(self, class_client: IntegrationInstance): + """Return all keys in /etc/apt/trusted.gpg.d/ and /etc/apt/trusted.gpg + in human readable format. Mimics the output of apt-key finger + """ + list_cmd = " ".join(gpg.GPG_LIST) + " " + keys = class_client.execute(list_cmd + cc_apt_configure.APT_LOCAL_KEYS) + print(keys) + files = class_client.execute( + "ls " + cc_apt_configure.APT_TRUSTED_GPG_DIR + ) + for file in files.split(): + path = cc_apt_configure.APT_TRUSTED_GPG_DIR + file + keys += class_client.execute(list_cmd + path) or "" + return keys + + def test_sources_list(self, class_client: IntegrationInstance): + """Integration test for the apt module's `sources_list` functionality. + + This test specifies a ``sources_list`` and then checks that (a) the + expected number of sources.list entries is present, and (b) that each + expected line appears in the file. + + (This is ported from + `tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml`.) + """ + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert 6 == len(sources_list.rstrip().split("\n")) + + for expected_re in EXPECTED_REGEXES: + assert re.search(expected_re, sources_list) is not None + + def test_apt_conf(self, class_client: IntegrationInstance): + """Test the apt conf functionality. + + Ported from tests/cloud_tests/testcases/modules/apt_configure_conf.py + """ + apt_config = class_client.read_from_file( + "/etc/apt/apt.conf.d/94cloud-init-config" + ) + assert 'Assume-Yes "true";' in apt_config + assert 'Fix-Broken "true";' in apt_config + + def test_ppa_source(self, class_client: IntegrationInstance): + """Test the apt ppa functionality. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_sources_ppa.py + """ + release = ImageSpecification.from_os_image().release + ppa_path_contents = class_client.read_from_file( + "/etc/apt/sources.list.d/" + "simplestreams-dev-ubuntu-trunk-{}.list".format(release) + ) + + assert ( + "http://ppa.launchpad.net/simplestreams-dev/trunk/ubuntu" + in ppa_path_contents + ) + + assert TEST_PPA_KEY in self.get_keys(class_client) + + def test_signed_by(self, class_client: IntegrationInstance): + """Test the apt signed-by functionality.""" + release = ImageSpecification.from_os_image().release + source = ( + "deb [signed-by=/etc/apt/cloud-init.gpg.d/test_signed_by.gpg] " + "http://ppa.launchpad.net/juju/stable/ubuntu" + " {} main".format(release) + ) + path_contents = class_client.read_from_file( + "/etc/apt/sources.list.d/test_signed_by.list" + ) + assert path_contents == source + + key = class_client.execute( + "gpg --no-default-keyring --with-fingerprint --list-keys " + "--keyring /etc/apt/cloud-init.gpg.d/test_signed_by.gpg" + ) + + assert TEST_SIGNED_BY_KEY in key + + def test_bad_key(self, class_client: IntegrationInstance): + """Test the apt signed-by functionality.""" + with pytest.raises(OSError): + class_client.read_from_file( + "/etc/apt/trusted.list.d/test_bad_key.gpg" + ) + + def test_key(self, class_client: IntegrationInstance): + """Test the apt key functionality. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_sources_key.py + """ + test_archive_contents = class_client.read_from_file( + "/etc/apt/sources.list.d/test_key.list" + ) + + assert ( + "http://ppa.launchpad.net/cloud-init-dev/test-archive/ubuntu" + in test_archive_contents + ) + assert TEST_KEY in self.get_keys(class_client) + + def test_keyserver(self, class_client: IntegrationInstance): + """Test the apt keyserver functionality. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_sources_keyserver.py + """ + test_keyserver_contents = class_client.read_from_file( + "/etc/apt/sources.list.d/test_keyserver.list" + ) + + assert ( + "http://ppa.launchpad.net/canonical-kernel-team/ppa/ubuntu" + in test_keyserver_contents + ) + + assert TEST_KEYSERVER_KEY in self.get_keys(class_client) + + def test_os_pipelining(self, class_client: IntegrationInstance): + """Test 'os' settings does not write apt config file. + + Ported from tests/cloud_tests/testcases/modules/apt_pipelining_os.py + """ + conf_exists = class_client.execute( + "test -f /etc/apt/apt.conf.d/90cloud-init-pipelining" + ).ok + assert conf_exists is False + + +_DEFAULT_DATA = """\ +#cloud-config +apt: + primary: + - arches: + - default + {uri} + security: + - arches: + - default +""" +DEFAULT_DATA = _DEFAULT_DATA.format(uri="") + + +@pytest.mark.ubuntu +@pytest.mark.user_data(DEFAULT_DATA) +class TestDefaults: + @pytest.mark.openstack + def test_primary_on_openstack(self, class_client: IntegrationInstance): + """Test apt default primary source on openstack. + + When no uri is provided. + """ + zone = class_client.execute("cloud-init query v1.availability_zone") + sources_list = class_client.read_from_file("/etc/apt/sources.list") + assert "{}.clouds.archive.ubuntu.com".format(zone) in sources_list + + def test_security(self, class_client: IntegrationInstance): + """Test apt default security sources. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_security.py + """ + sources_list = class_client.read_from_file("/etc/apt/sources.list") + + # 3 lines from main, universe, and multiverse + sec_url = "deb http://security.ubuntu.com/ubuntu" + if class_client.settings.PLATFORM == "azure": + sec_url = ( + "deb http://azure.archive.ubuntu.com/ubuntu/ jammy-security" + ) + sec_src_url = sec_url.replace("deb ", "# deb-src ") + assert 3 == sources_list.count(sec_url) + assert 3 == sources_list.count(sec_src_url) + + +DEFAULT_DATA_WITH_URI = _DEFAULT_DATA.format( + uri='uri: "http://something.random.invalid/ubuntu"' +) + + +@pytest.mark.user_data(DEFAULT_DATA_WITH_URI) +def test_default_primary_with_uri(client: IntegrationInstance): + """Test apt default primary sources. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_primary.py + """ + sources_list = client.read_from_file("/etc/apt/sources.list") + assert "archive.ubuntu.com" not in sources_list + + assert "something.random.invalid" in sources_list + + +DISABLED_DATA = """\ +#cloud-config +apt: + disable_suites: + - $RELEASE + - $RELEASE-updates + - $RELEASE-backports + - $RELEASE-security +apt_pipelining: false +""" + + +@pytest.mark.ubuntu +@pytest.mark.user_data(DISABLED_DATA) +class TestDisabled: + def test_disable_suites(self, class_client: IntegrationInstance): + """Test disabling of apt suites. + + Ported from + tests/cloud_tests/testcases/modules/apt_configure_disable_suites.py + """ + sources_list = class_client.execute( + "cat /etc/apt/sources.list | grep -v '^#'" + ).strip() + assert "" == sources_list + + def test_disable_apt_pipelining(self, class_client: IntegrationInstance): + """Test disabling of apt pipelining. + + Ported from + tests/cloud_tests/testcases/modules/apt_pipelining_disable.py + """ + conf = class_client.read_from_file( + "/etc/apt/apt.conf.d/90cloud-init-pipelining" + ) + assert 'Acquire::http::Pipeline-Depth "0";' in conf + + +APT_PROXY_DATA = """\ +#cloud-config +apt: + proxy: "http://proxy.internal:3128" + http_proxy: "http://squid.internal:3128" + ftp_proxy: "ftp://squid.internal:3128" + https_proxy: "https://squid.internal:3128" +""" + + +@pytest.mark.ubuntu +@pytest.mark.user_data(APT_PROXY_DATA) +def test_apt_proxy(client: IntegrationInstance): + """Test the apt proxy data gets written correctly.""" + out = client.read_from_file("/etc/apt/apt.conf.d/90cloud-init-aptproxy") + assert 'Acquire::http::Proxy "http://proxy.internal:3128";' in out + assert 'Acquire::http::Proxy "http://squid.internal:3128";' in out + assert 'Acquire::ftp::Proxy "ftp://squid.internal:3128";' in out + assert 'Acquire::https::Proxy "https://squid.internal:3128";' in out diff --git a/tests/integration_tests/modules/test_apt_configure_sources_list.py b/tests/integration_tests/modules/test_apt_configure_sources_list.py deleted file mode 100644 index d2bcc61a..00000000 --- a/tests/integration_tests/modules/test_apt_configure_sources_list.py +++ /dev/null @@ -1,51 +0,0 @@ -"""Integration test for the apt module's ``sources_list`` functionality. - -This test specifies a ``sources_list`` and then checks that (a) the expected -number of sources.list entries is present, and (b) that each expected line -appears in the file. - -(This is ported from -``tests/cloud_tests/testcases/modules/apt_configure_sources_list.yaml``.)""" -import re - -import pytest - - -USER_DATA = """\ -#cloud-config -apt: - primary: - - arches: [default] - uri: http://archive.ubuntu.com/ubuntu - security: - - arches: [default] - uri: http://security.ubuntu.com/ubuntu - sources_list: | - deb $MIRROR $RELEASE main restricted - deb-src $MIRROR $RELEASE main restricted - deb $PRIMARY $RELEASE universe restricted - deb-src $PRIMARY $RELEASE universe restricted - deb $SECURITY $RELEASE-security multiverse - deb-src $SECURITY $RELEASE-security multiverse -""" - -EXPECTED_REGEXES = [ - r"deb http://archive.ubuntu.com/ubuntu [a-z].* main restricted", - r"deb-src http://archive.ubuntu.com/ubuntu [a-z].* main restricted", - r"deb http://archive.ubuntu.com/ubuntu [a-z].* universe restricted", - r"deb-src http://archive.ubuntu.com/ubuntu [a-z].* universe restricted", - r"deb http://security.ubuntu.com/ubuntu [a-z].*security multiverse", - r"deb-src http://security.ubuntu.com/ubuntu [a-z].*security multiverse", -] - - -@pytest.mark.ci -class TestAptConfigureSourcesList: - - @pytest.mark.user_data(USER_DATA) - def test_sources_list(self, client): - sources_list = client.read_from_file("/etc/apt/sources.list") - assert 6 == len(sources_list.rstrip().split('\n')) - - for expected_re in EXPECTED_REGEXES: - assert re.search(expected_re, sources_list) is not None diff --git a/tests/integration_tests/modules/test_ca_certs.py b/tests/integration_tests/modules/test_ca_certs.py new file mode 100644 index 00000000..7247fd7d --- /dev/null +++ b/tests/integration_tests/modules/test_ca_certs.py @@ -0,0 +1,90 @@ +"""Integration tests for cc_ca_certs. + +(This is ported from ``tests/cloud_tests//testcases/modules/ca_certs.yaml``.) + +TODO: +* Mark this as running on Debian and Alpine (once we have marks for that) +* Implement testing for the RHEL-specific paths +""" +import os.path + +import pytest + +USER_DATA = """\ +#cloud-config +ca_certs: + remove_defaults: true + trusted: + - | + -----BEGIN CERTIFICATE----- + MIIGJzCCBA+gAwIBAgIBATANBgkqhkiG9w0BAQUFADCBsjELMAkGA1UEBhMCRlIx + DzANBgNVBAgMBkFsc2FjZTETMBEGA1UEBwwKU3RyYXNib3VyZzEYMBYGA1UECgwP + d3d3LmZyZWVsYW4ub3JnMRAwDgYDVQQLDAdmcmVlbGFuMS0wKwYDVQQDDCRGcmVl + bGFuIFNhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkxIjAgBgkqhkiG9w0BCQEW + E2NvbnRhY3RAZnJlZWxhbi5vcmcwHhcNMTIwNDI3MTAzMTE4WhcNMjIwNDI1MTAz + MTE4WjB+MQswCQYDVQQGEwJGUjEPMA0GA1UECAwGQWxzYWNlMRgwFgYDVQQKDA93 + d3cuZnJlZWxhbi5vcmcxEDAOBgNVBAsMB2ZyZWVsYW4xDjAMBgNVBAMMBWFsaWNl + MSIwIAYJKoZIhvcNAQkBFhNjb250YWN0QGZyZWVsYW4ub3JnMIICIjANBgkqhkiG + 9w0BAQEFAAOCAg8AMIICCgKCAgEA3W29+ID6194bH6ejLrIC4hb2Ugo8v6ZC+Mrc + k2dNYMNPjcOKABvxxEtBamnSaeU/IY7FC/giN622LEtV/3oDcrua0+yWuVafyxmZ + yTKUb4/GUgafRQPf/eiX9urWurtIK7XgNGFNUjYPq4dSJQPPhwCHE/LKAykWnZBX + RrX0Dq4XyApNku0IpjIjEXH+8ixE12wH8wt7DEvdO7T3N3CfUbaITl1qBX+Nm2Z6 + q4Ag/u5rl8NJfXg71ZmXA3XOj7zFvpyapRIZcPmkvZYn7SMCp8dXyXHPdpSiIWL2 + uB3KiO4JrUYvt2GzLBUThp+lNSZaZ/Q3yOaAAUkOx+1h08285Pi+P8lO+H2Xic4S + vMq1xtLg2bNoPC5KnbRfuFPuUD2/3dSiiragJ6uYDLOyWJDivKGt/72OVTEPAL9o + 6T2pGZrwbQuiFGrGTMZOvWMSpQtNl+tCCXlT4mWqJDRwuMGrI4DnnGzt3IKqNwS4 + Qyo9KqjMIPwnXZAmWPm3FOKe4sFwc5fpawKO01JZewDsYTDxVj+cwXwFxbE2yBiF + z2FAHwfopwaH35p3C6lkcgP2k/zgAlnBluzACUI+MKJ/G0gv/uAhj1OHJQ3L6kn1 + SpvQ41/ueBjlunExqQSYD7GtZ1Kg8uOcq2r+WISE3Qc9MpQFFkUVllmgWGwYDuN3 + Zsez95kCAwEAAaN7MHkwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0EHxYdT3BlblNT + TCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYEFFlfyRO6G8y5qEFKikl5 + ajb2fT7XMB8GA1UdIwQYMBaAFCNsLT0+KV14uGw+quK7Lh5sh/JTMA0GCSqGSIb3 + DQEBBQUAA4ICAQAT5wJFPqervbja5+90iKxi1d0QVtVGB+z6aoAMuWK+qgi0vgvr + mu9ot2lvTSCSnRhjeiP0SIdqFMORmBtOCFk/kYDp9M/91b+vS+S9eAlxrNCB5VOf + PqxEPp/wv1rBcE4GBO/c6HcFon3F+oBYCsUQbZDKSSZxhDm3mj7pb67FNbZbJIzJ + 70HDsRe2O04oiTx+h6g6pW3cOQMgIAvFgKN5Ex727K4230B0NIdGkzuj4KSML0NM + slSAcXZ41OoSKNjy44BVEZv0ZdxTDrRM4EwJtNyggFzmtTuV02nkUj1bYYYC5f0L + ADr6s0XMyaNk8twlWYlYDZ5uKDpVRVBfiGcq0uJIzIvemhuTrofh8pBQQNkPRDFT + Rq1iTo1Ihhl3/Fl1kXk1WR3jTjNb4jHX7lIoXwpwp767HAPKGhjQ9cFbnHMEtkro + RlJYdtRq5mccDtwT0GFyoJLLBZdHHMHJz0F9H7FNk2tTQQMhK5MVYwg+LIaee586 + CQVqfbscp7evlgjLW98H+5zylRHAgoH2G79aHljNKMp9BOuq6SnEglEsiWGVtu2l + hnx8SB3sVJZHeer8f/UQQwqbAO+Kdy70NmbSaqaVtp8jOxLiidWkwSyRTsuU6D8i + DiH5uEqBXExjrj0FslxcVKdVj5glVcSmkLwZKbEU1OKwleT/iXFhvooWhQ== + -----END CERTIFICATE----- +""" + + +@pytest.mark.ubuntu +@pytest.mark.user_data(USER_DATA) +class TestCaCerts: + def test_certs_updated(self, class_client): + """Test that /etc/ssl/certs is updated as we expect.""" + root = "/etc/ssl/certs" + filenames = class_client.execute(["ls", "-1", root]).splitlines() + unlinked_files = [] + links = {} + for filename in filenames: + full_path = os.path.join(root, filename) + symlink_target = class_client.execute(["readlink", full_path]) + is_symlink = symlink_target.ok + if is_symlink: + links[filename] = symlink_target + else: + unlinked_files.append(filename) + + assert ["ca-certificates.crt"] == unlinked_files + assert "cloud-init-ca-certs.pem" == links["a535c1f3.0"] + assert ( + "/usr/share/ca-certificates/cloud-init-ca-certs.crt" + == links["cloud-init-ca-certs.pem"] + ) + + def test_cert_installed(self, class_client): + """Test that our specified cert has been installed""" + checksum = class_client.execute( + "sha256sum /etc/ssl/certs/ca-certificates.crt" + ) + assert ( + "78e875f18c73c1aab9167ae0bd323391e52222cc2dbcda42d129537219300062" + in checksum + ) diff --git a/tests/integration_tests/modules/test_cli.py b/tests/integration_tests/modules/test_cli.py new file mode 100644 index 00000000..baaa7567 --- /dev/null +++ b/tests/integration_tests/modules/test_cli.py @@ -0,0 +1,81 @@ +"""Integration tests for CLI functionality + +These would be for behavior manually invoked by user from the command line +""" + +import pytest + +from tests.integration_tests.instances import IntegrationInstance + +VALID_USER_DATA = """\ +#cloud-config +runcmd: + - echo 'hi' > /var/tmp/test +""" + +INVALID_USER_DATA_HEADER = """\ +runcmd: + - echo 'hi' > /var/tmp/test +""" + +INVALID_USER_DATA_SCHEMA = """\ +#cloud-config +updates: + notnetwork: -1 +apt_pipelining: bogus +""" + + +@pytest.mark.user_data(VALID_USER_DATA) +def test_valid_userdata(client: IntegrationInstance): + """Test `cloud-init devel schema` with valid userdata. + + PR #575 + """ + result = client.execute("cloud-init devel schema --system") + assert result.ok + assert "Valid cloud-config: system userdata" == result.stdout.strip() + result = client.execute("cloud-init status --long") + if not result.ok: + raise AssertionError( + f"Unexpected error from cloud-init status: {result}" + ) + + +@pytest.mark.user_data(INVALID_USER_DATA_HEADER) +def test_invalid_userdata(client: IntegrationInstance): + """Test `cloud-init devel schema` with invalid userdata. + + PR #575 + """ + result = client.execute("cloud-init devel schema --system") + assert not result.ok + assert "Cloud config schema errors" in result.stderr + assert 'needs to begin with "#cloud-config"' in result.stderr + result = client.execute("cloud-init status --long") + if not result.ok: + raise AssertionError( + f"Unexpected error from cloud-init status: {result}" + ) + + +@pytest.mark.user_data(INVALID_USER_DATA_SCHEMA) +def test_invalid_userdata_schema(client: IntegrationInstance): + """Test invalid schema represented as Warnings, not fatal + + PR #1175 + """ + result = client.execute("cloud-init status --long") + assert result.ok + log = client.read_from_file("/var/log/cloud-init.log") + warning = ( + "[WARNING]: Invalid cloud-config provided:\napt_pipelining: 'bogus'" + " is not valid under any of the given schemas\nupdates: Additional" + " properties are not allowed ('notnetwork' was unexpected)" + ) + assert warning in log + result = client.execute("cloud-init status --long") + if not result.ok: + raise AssertionError( + f"Unexpected error from cloud-init status: {result}" + ) diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py new file mode 100644 index 00000000..7a9a6e27 --- /dev/null +++ b/tests/integration_tests/modules/test_combined.py @@ -0,0 +1,342 @@ +# This file is part of cloud-init. See LICENSE file for license information. +"""A set of somewhat unrelated tests that can be combined into a single +instance launch. Generally tests should only be added here if a failure +of the test would be unlikely to affect the running of another test using +the same instance launch. Most independent module coherence tests can go +here. +""" +import json +import re + +import pytest + +from tests.integration_tests.clouds import ImageSpecification +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import ( + retry, + verify_clean_log, + verify_ordered_items_in_text, +) + +USER_DATA = """\ +#cloud-config +apt: + primary: + - arches: [default] + uri: http://us.archive.ubuntu.com/ubuntu/ +byobu_by_default: enable +final_message: | + This is my final message! + $version + $timestamp + $datasource + $uptime +locale: en_GB.UTF-8 +locale_configfile: /etc/default/locale +ntp: + servers: ['ntp.ubuntu.com'] +package_update: true +random_seed: + data: 'MYUb34023nD:LFDK10913jk;dfnk:Df' + encoding: raw + file: /root/seed +rsyslog: + configs: + - "*.* @@127.0.0.1" + - filename: 0-basic-config.conf + content: | + module(load="imtcp") + input(type="imtcp" port="514") + $template RemoteLogs,"/var/tmp/rsyslog.log" + *.* ?RemoteLogs + & ~ + remotes: + me: "127.0.0.1" +runcmd: + - echo 'hello world' > /var/tmp/runcmd_output + + - # + - logger "My test log" +snap: + squashfuse_in_container: true + commands: + - snap install hello-world +ssh_import_id: + - gh:powersj + - lp:smoser +timezone: US/Aleutian +""" + + +@pytest.mark.ci +@pytest.mark.user_data(USER_DATA) +class TestCombined: + def test_final_message(self, class_client: IntegrationInstance): + """Test that final_message module works as expected. + + Also tests LP 1511485: final_message is silent. + """ + client = class_client + log = client.read_from_file("/var/log/cloud-init.log") + expected = ( + "This is my final message!\n" + r"\d+\.\d+.*\n" + r"\w{3}, \d{2} \w{3} \d{4} \d{2}:\d{2}:\d{2} \+\d{4}\n" # Datetime + "DataSource.*\n" + r"\d+\.\d+" + ) + + assert re.search(expected, log) + + def test_ntp_with_apt(self, class_client: IntegrationInstance): + """LP #1628337. + + cloud-init tries to install NTP before even + configuring the archives. + """ + client = class_client + log = client.read_from_file("/var/log/cloud-init.log") + assert "W: Failed to fetch" not in log + assert "W: Some index files failed to download" not in log + assert "E: Unable to locate package ntp" not in log + + def test_byobu(self, class_client: IntegrationInstance): + """Test byobu configured as enabled by default.""" + client = class_client + assert client.execute('test -e "/etc/byobu/autolaunch"').ok + + def test_configured_locale(self, class_client: IntegrationInstance): + """Test locale can be configured correctly.""" + client = class_client + default_locale = client.read_from_file("/etc/default/locale") + assert "LANG=en_GB.UTF-8" in default_locale + + locale_a = client.execute("locale -a") + verify_ordered_items_in_text(["en_GB.utf8", "en_US.utf8"], locale_a) + + locale_gen = client.execute( + "cat /etc/locale.gen | grep -v '^#' | uniq" + ) + verify_ordered_items_in_text( + ["en_GB.UTF-8", "en_US.UTF-8"], locale_gen + ) + + def test_random_seed_data(self, class_client: IntegrationInstance): + """Integration test for the random seed module. + + This test specifies a command to be executed by the ``seed_random`` + module, by providing a different data to be used as seed data. We will + then check if that seed data was actually used. + """ + client = class_client + + # Only read the first 31 characters, because the rest could be + # binary data + result = client.execute("head -c 31 < /root/seed") + assert result.startswith("MYUb34023nD:LFDK10913jk;dfnk:Df") + + def test_rsyslog(self, class_client: IntegrationInstance): + """Test rsyslog is configured correctly.""" + client = class_client + assert "My test log" in client.read_from_file("/var/tmp/rsyslog.log") + + def test_runcmd(self, class_client: IntegrationInstance): + """Test runcmd works as expected""" + client = class_client + assert "hello world" == client.read_from_file("/var/tmp/runcmd_output") + + @retry(tries=30, delay=1) + def test_ssh_import_id(self, class_client: IntegrationInstance): + """Integration test for the ssh_import_id module. + + This test specifies ssh keys to be imported by the ``ssh_import_id`` + module and then checks that if the ssh keys were successfully imported. + + TODO: + * This test assumes that SSH keys will be imported into the + /home/ubuntu; this will need modification to run on other OSes. + """ + client = class_client + ssh_output = client.read_from_file("/home/ubuntu/.ssh/authorized_keys") + + assert "# ssh-import-id gh:powersj" in ssh_output + assert "# ssh-import-id lp:smoser" in ssh_output + + def test_snap(self, class_client: IntegrationInstance): + """Integration test for the snap module. + + This test specifies a command to be executed by the ``snap`` module + and then checks that if that command was executed during boot. + """ + client = class_client + snap_output = client.execute("snap list") + assert "core " in snap_output + assert "hello-world " in snap_output + + def test_timezone(self, class_client: IntegrationInstance): + """Integration test for the timezone module. + + This test specifies a timezone to be used by the ``timezone`` module + and then checks that if that timezone was respected during boot. + """ + client = class_client + timezone_output = client.execute( + 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"' + ) + assert timezone_output.strip() == "HDT" + + def test_no_problems(self, class_client: IntegrationInstance): + """Test no errors, warnings, or tracebacks""" + client = class_client + status_file = client.read_from_file("/run/cloud-init/status.json") + status_json = json.loads(status_file)["v1"] + for stage in ("init", "init-local", "modules-config", "modules-final"): + assert status_json[stage]["errors"] == [] + result_file = client.read_from_file("/run/cloud-init/result.json") + result_json = json.loads(result_file)["v1"] + assert result_json["errors"] == [] + + log = client.read_from_file("/var/log/cloud-init.log") + verify_clean_log(log) + + def test_correct_datasource_detected( + self, class_client: IntegrationInstance + ): + """Test datasource is detected at the proper boot stage.""" + client = class_client + status_file = client.read_from_file("/run/cloud-init/status.json") + parsed_datasource = json.loads(status_file)["v1"]["datasource"] + + if client.settings.PLATFORM in ["lxd_container", "lxd_vm"]: + assert parsed_datasource.startswith("DataSourceNoCloud") + else: + platform_datasources = { + "azure": "DataSourceAzure [seed=/dev/sr0]", + "ec2": "DataSourceEc2Local", + "gce": "DataSourceGCELocal", + "oci": "DataSourceOracle", + "openstack": "DataSourceOpenStackLocal [net,ver=2]", + } + assert ( + platform_datasources[client.settings.PLATFORM] + == parsed_datasource + ) + + def test_cloud_id_file_symlink(self, class_client: IntegrationInstance): + cloud_id = class_client.execute("cloud-id").stdout + expected_link_output = ( + "'/run/cloud-init/cloud-id' -> " + f"'/run/cloud-init/cloud-id-{cloud_id}'" + ) + assert expected_link_output == str( + class_client.execute("stat -c %N /run/cloud-init/cloud-id") + ) + + def _check_common_metadata(self, data): + assert data["base64_encoded_keys"] == [] + assert data["merged_cfg"] == "redacted for non-root user" + + image_spec = ImageSpecification.from_os_image() + assert data["sys_info"]["dist"][0] == image_spec.os + + v1_data = data["v1"] + assert re.match(r"\d\.\d+\.\d+-\d+", v1_data["kernel_release"]) + assert v1_data["variant"] == image_spec.os + assert v1_data["distro"] == image_spec.os + assert v1_data["distro_release"] == image_spec.release + assert v1_data["machine"] == "x86_64" + assert re.match(r"3.\d\.\d", v1_data["python_version"]) + + @pytest.mark.lxd_container + def test_instance_json_lxd(self, class_client: IntegrationInstance): + client = class_client + instance_json_file = client.read_from_file( + "/run/cloud-init/instance-data.json" + ) + + data = json.loads(instance_json_file) + self._check_common_metadata(data) + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert v1_data["cloud_id"] == "lxd" + assert f"{v1_data['cloud_id']}" == client.read_from_file( + "/run/cloud-init/cloud-id-lxd" + ) + assert ( + v1_data["subplatform"] + == "seed-dir (/var/lib/cloud/seed/nocloud-net)" + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None + + @pytest.mark.lxd_vm + def test_instance_json_lxd_vm(self, class_client: IntegrationInstance): + client = class_client + instance_json_file = client.read_from_file( + "/run/cloud-init/instance-data.json" + ) + + data = json.loads(instance_json_file) + self._check_common_metadata(data) + v1_data = data["v1"] + assert v1_data["cloud_name"] == "unknown" + assert v1_data["platform"] == "lxd" + assert v1_data["cloud_id"] == "lxd" + assert f"{v1_data['cloud_id']}" == client.read_from_file( + "/run/cloud-init/cloud-id-lxd" + ) + assert any( + [ + "/var/lib/cloud/seed/nocloud-net" in v1_data["subplatform"], + "/dev/sr0" in v1_data["subplatform"], + ] + ) + assert v1_data["availability_zone"] is None + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"] == client.instance.name + assert v1_data["region"] is None + + @pytest.mark.ec2 + def test_instance_json_ec2(self, class_client: IntegrationInstance): + client = class_client + instance_json_file = client.read_from_file( + "/run/cloud-init/instance-data.json" + ) + data = json.loads(instance_json_file) + v1_data = data["v1"] + assert v1_data["cloud_name"] == "aws" + assert v1_data["platform"] == "ec2" + # Different regions will show up as ec2-(gov|china) + assert v1_data["cloud_id"].startswith("ec2") + assert f"{v1_data['cloud_id']}" == client.read_from_file( + "/run/cloud-init/cloud-id-ec2" + ) + assert v1_data["subplatform"].startswith("metadata") + assert ( + v1_data["availability_zone"] == client.instance.availability_zone + ) + assert v1_data["instance_id"] == client.instance.name + assert v1_data["local_hostname"].startswith("ip-") + assert v1_data["region"] == client.cloud.cloud_instance.region + + @pytest.mark.gce + def test_instance_json_gce(self, class_client: IntegrationInstance): + client = class_client + instance_json_file = client.read_from_file( + "/run/cloud-init/instance-data.json" + ) + data = json.loads(instance_json_file) + self._check_common_metadata(data) + v1_data = data["v1"] + assert v1_data["cloud_name"] == "gce" + assert v1_data["platform"] == "gce" + assert f"{v1_data['cloud_id']}" == client.read_from_file( + "/run/cloud-init/cloud-id-gce" + ) + assert v1_data["subplatform"].startswith("metadata") + assert v1_data["availability_zone"] == client.instance.zone + assert v1_data["instance_id"] == client.instance.instance_id + assert v1_data["local_hostname"] == client.instance.name diff --git a/tests/integration_tests/modules/test_command_output.py b/tests/integration_tests/modules/test_command_output.py new file mode 100644 index 00000000..96525cac --- /dev/null +++ b/tests/integration_tests/modules/test_command_output.py @@ -0,0 +1,21 @@ +"""Integration test for output redirection. + +This test redirects the output of a command to a file and then checks the file. + +(This is ported from +``tests/cloud_tests/testcases/main/command_output_simple.yaml``.)""" +import pytest + +from tests.integration_tests.instances import IntegrationInstance + +USER_DATA = """\ +#cloud-config +output: { all: "| tee -a /var/log/cloud-init-test-output" } +final_message: "should be last line in cloud-init-test-output file" +""" + + +@pytest.mark.user_data(USER_DATA) +def test_runcmd(client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init-test-output") + assert "should be last line in cloud-init-test-output file" in log diff --git a/tests/integration_tests/modules/test_disk_setup.py b/tests/integration_tests/modules/test_disk_setup.py new file mode 100644 index 00000000..7aaba7db --- /dev/null +++ b/tests/integration_tests/modules/test_disk_setup.py @@ -0,0 +1,212 @@ +import json +import os +from uuid import uuid4 + +import pytest +from pycloudlib.lxd.instance import LXDInstance + +from cloudinit.subp import subp +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import verify_clean_log + +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) + + +def setup_and_mount_lxd_disk(instance: LXDInstance): + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) + + +@pytest.fixture +def create_disk(): + # 640k should be enough for anybody + subp("dd if=/dev/zero of={} bs=1k count=640".format(DISK_PATH).split()) + yield + os.remove(DISK_PATH) + + +ALIAS_USERDATA = """\ +#cloud-config +device_aliases: + my_alias: /dev/sdb +disk_setup: + my_alias: + table_type: mbr + layout: [50, 50] + overwrite: True +fs_setup: +- label: fs1 + device: my_alias.1 + filesystem: ext4 +- label: fs2 + device: my_alias.2 + filesystem: ext4 +mounts: +- ["my_alias.1", "/mnt1"] +- ["my_alias.2", "/mnt2"] +""" + + +@pytest.mark.user_data(ALIAS_USERDATA) +@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk) +@pytest.mark.ubuntu +@pytest.mark.lxd_vm +class TestDeviceAliases: + """Test devices aliases work on disk setup/mount""" + + def test_device_alias(self, create_disk, client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init.log") + assert ( + "updated disk_setup device entry 'my_alias' to '/dev/sdb'" in log + ) + assert "changed my_alias.1 => /dev/sdb1" in log + assert "changed my_alias.2 => /dev/sdb2" in log + verify_clean_log(log) + + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][1]["name"] == "sdb2" + if "mountpoint" in sdb["children"][0]: + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["mountpoint"] == "/mnt2" + else: + assert sdb["children"][0]["mountpoints"] == ["/mnt1"] + assert sdb["children"][1]["mountpoints"] == ["/mnt2"] + result = client.execute("mount -a") + assert result.return_code == 0 + assert result.stdout.strip() == "" + assert result.stderr.strip() == "" + result = client.execute("findmnt -J /mnt1") + assert result.return_code == 0 + result = client.execute("findmnt -J /mnt2") + assert result.return_code == 0 + + +PARTPROBE_USERDATA = """\ +#cloud-config +disk_setup: + /dev/sdb: + table_type: mbr + layout: [50, 50] + overwrite: True +fs_setup: + - label: test + device: /dev/sdb1 + filesystem: ext4 + - label: test2 + device: /dev/sdb2 + filesystem: ext4 +mounts: +- ["/dev/sdb1", "/mnt1"] +- ["/dev/sdb2", "/mnt2"] +""" + +UPDATED_PARTPROBE_USERDATA = """\ +#cloud-config +disk_setup: + /dev/sdb: + table_type: mbr + layout: [100] + overwrite: True +fs_setup: + - label: test3 + device: /dev/sdb1 + filesystem: ext4 +mounts: +- ["/dev/sdb1", "/mnt3"] +""" + + +@pytest.mark.user_data(PARTPROBE_USERDATA) +@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk) +@pytest.mark.ubuntu +@pytest.mark.lxd_vm +class TestPartProbeAvailability: + """Test disk setup works with partprobe + + Disk setup can run successfully on a mounted partition when + partprobe is being used. + + lp-1920939 + """ + + def _verify_first_disk_setup(self, client, log): + verify_clean_log(log) + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 2 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["children"][1]["name"] == "sdb2" + if "mountpoint" in sdb["children"][0]: + assert sdb["children"][0]["mountpoint"] == "/mnt1" + assert sdb["children"][1]["mountpoint"] == "/mnt2" + else: + assert sdb["children"][0]["mountpoints"] == ["/mnt1"] + assert sdb["children"][1]["mountpoints"] == ["/mnt2"] + + # Not bionic because the LXD agent gets in the way of us + # changing the userdata + @pytest.mark.not_bionic + def test_disk_setup_when_mounted( + self, create_disk, client: IntegrationInstance + ): + """Test lp-1920939. + + We insert an extra disk into our VM, format it to have two partitions, + modify our cloud config to mount devices before disk setup, and modify + our userdata to setup a single partition on the disk. + + This allows cloud-init to attempt disk setup on a mounted partition. + When blockdev is in use, it will fail with + "blockdev: ioctl error on BLKRRPART: Device or resource busy" along + with a warning and a traceback. When partprobe is in use, everything + should work successfully. + """ + log = client.read_from_file("/var/log/cloud-init.log") + self._verify_first_disk_setup(client, log) + + # Update our userdata and cloud.cfg to mount then perform new disk + # setup + client.write_to_file( + "/var/lib/cloud/seed/nocloud-net/user-data", + UPDATED_PARTPROBE_USERDATA, + ) + client.execute( + "sed -i 's/write-files/write-files\\n - mounts/' " + "/etc/cloud/cloud.cfg" + ) + + client.execute("cloud-init clean --logs") + client.restart() + + # Assert new setup works as expected + verify_clean_log(log) + + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + if "mountpoint" in sdb["children"][0]: + assert sdb["children"][0]["mountpoint"] == "/mnt3" + else: + assert sdb["children"][0]["mountpoints"] == ["/mnt3"] + + def test_disk_setup_no_partprobe( + self, create_disk, client: IntegrationInstance + ): + """Ensure disk setup still works as expected without partprobe.""" + # We can't do this part in a bootcmd because the path has already + # been found by the time we get to the bootcmd + client.execute("rm $(which partprobe)") + client.execute("cloud-init clean --logs") + client.restart() + + log = client.read_from_file("/var/log/cloud-init.log") + self._verify_first_disk_setup(client, log) + + assert "partprobe" not in log diff --git a/tests/integration_tests/modules/test_growpart.py b/tests/integration_tests/modules/test_growpart.py new file mode 100644 index 00000000..67251817 --- /dev/null +++ b/tests/integration_tests/modules/test_growpart.py @@ -0,0 +1,68 @@ +import json +import os +import pathlib +from uuid import uuid4 + +import pytest +from pycloudlib.lxd.instance import LXDInstance + +from cloudinit.subp import subp +from tests.integration_tests.instances import IntegrationInstance + +DISK_PATH = "/tmp/test_disk_setup_{}".format(uuid4()) + + +def setup_and_mount_lxd_disk(instance: LXDInstance): + subp( + "lxc config device add {} test-disk-setup-disk disk source={}".format( + instance.name, DISK_PATH + ).split() + ) + + +@pytest.fixture(scope="class", autouse=True) +def create_disk(): + """Create 16M sparse file""" + pathlib.Path(DISK_PATH).touch() + os.truncate(DISK_PATH, 1 << 24) + yield + os.remove(DISK_PATH) + + +# Create undersized partition in bootcmd +ALIAS_USERDATA = """\ +#cloud-config +bootcmd: + - parted /dev/sdb --script \ + mklabel gpt \ + mkpart primary 0 1MiB + - parted /dev/sdb --script print +growpart: + devices: + - "/" + - "/dev/sdb1" +runcmd: + - parted /dev/sdb --script print +""" + + +@pytest.mark.user_data(ALIAS_USERDATA) +@pytest.mark.lxd_setup.with_args(setup_and_mount_lxd_disk) +@pytest.mark.ubuntu +@pytest.mark.lxd_vm +class TestGrowPart: + """Test growpart""" + + def test_grow_part(self, client: IntegrationInstance): + """Verify""" + log = client.read_from_file("/var/log/cloud-init.log") + assert ( + "cc_growpart.py[INFO]: '/dev/sdb1' resized:" + " changed (/dev/sdb, 1) from" in log + ) + + lsblk = json.loads(client.execute("lsblk --json")) + sdb = [x for x in lsblk["blockdevices"] if x["name"] == "sdb"][0] + assert len(sdb["children"]) == 1 + assert sdb["children"][0]["name"] == "sdb1" + assert sdb["size"] == "16M" diff --git a/tests/integration_tests/modules/test_hotplug.py b/tests/integration_tests/modules/test_hotplug.py new file mode 100644 index 00000000..0bad761e --- /dev/null +++ b/tests/integration_tests/modules/test_hotplug.py @@ -0,0 +1,112 @@ +import time +from collections import namedtuple + +import pytest +import yaml + +from tests.integration_tests.instances import IntegrationInstance + +USER_DATA = """\ +#cloud-config +updates: + network: + when: ['hotplug'] +""" + +ip_addr = namedtuple("ip_addr", "interface state ip4 ip6") + + +def _wait_till_hotplug_complete(client, expected_runs=1): + for _ in range(60): + log = client.read_from_file("/var/log/cloud-init.log") + if log.count("Exiting hotplug handler") == expected_runs: + return log + time.sleep(1) + raise Exception("Waiting for hotplug handler failed") + + +def _get_ip_addr(client): + ips = [] + lines = client.execute("ip --brief addr").split("\n") + for line in lines: + attributes = line.split() + interface, state = attributes[0], attributes[1] + ip4_cidr = attributes[2] if len(attributes) > 2 else None + ip6_cidr = attributes[3] if len(attributes) > 3 else None + ip4 = ip4_cidr.split("/")[0] if ip4_cidr else None + ip6 = ip6_cidr.split("/")[0] if ip6_cidr else None + ip = ip_addr(interface, state, ip4, ip6) + ips.append(ip) + return ips + + +@pytest.mark.openstack +# On Bionic, we traceback when attempting to detect the hotplugged +# device in the updated metadata. This is because Bionic is specifically +# configured not to provide network metadata. +@pytest.mark.not_bionic +@pytest.mark.user_data(USER_DATA) +def test_hotplug_add_remove(client: IntegrationInstance): + ips_before = _get_ip_addr(client) + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log + assert client.execute( + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" + ).ok + + # Add new NIC + added_ip = client.instance.add_network_interface() + _wait_till_hotplug_complete(client, expected_runs=1) + ips_after_add = _get_ip_addr(client) + new_addition = [ip for ip in ips_after_add if ip.ip4 == added_ip][0] + + assert len(ips_after_add) == len(ips_before) + 1 + assert added_ip not in [ip.ip4 for ip in ips_before] + assert added_ip in [ip.ip4 for ip in ips_after_add] + assert new_addition.state == "UP" + + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") + config = yaml.safe_load(netplan_cfg) + assert new_addition.interface in config["network"]["ethernets"] + + # Remove new NIC + client.instance.remove_network_interface(added_ip) + _wait_till_hotplug_complete(client, expected_runs=2) + ips_after_remove = _get_ip_addr(client) + assert len(ips_after_remove) == len(ips_before) + assert added_ip not in [ip.ip4 for ip in ips_after_remove] + + netplan_cfg = client.read_from_file("/etc/netplan/50-cloud-init.yaml") + config = yaml.safe_load(netplan_cfg) + assert new_addition.interface not in config["network"]["ethernets"] + + assert "enabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" + ) + + +@pytest.mark.openstack +def test_no_hotplug_in_userdata(client: IntegrationInstance): + ips_before = _get_ip_addr(client) + log = client.read_from_file("/var/log/cloud-init.log") + assert "Exiting hotplug handler" not in log + assert client.execute( + "test -f /etc/udev/rules.d/10-cloud-init-hook-hotplug.rules" + ).failed + + # Add new NIC + client.instance.add_network_interface() + log = client.read_from_file("/var/log/cloud-init.log") + assert "hotplug-hook" not in log + + ips_after_add = _get_ip_addr(client) + if len(ips_after_add) == len(ips_before) + 1: + # We can see the device, but it should not have been brought up + new_ip = [ip for ip in ips_after_add if ip not in ips_before][0] + assert new_ip.state == "DOWN" + else: + assert len(ips_after_add) == len(ips_before) + + assert "disabled" == client.execute( + "cloud-init devel hotplug-hook -s net query" + ) diff --git a/tests/integration_tests/modules/test_jinja_templating.py b/tests/integration_tests/modules/test_jinja_templating.py new file mode 100644 index 00000000..7788c6f0 --- /dev/null +++ b/tests/integration_tests/modules/test_jinja_templating.py @@ -0,0 +1,33 @@ +# This file is part of cloud-init. See LICENSE file for license information. +import pytest + +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import verify_ordered_items_in_text + +USER_DATA = """\ +## template: jinja +#cloud-config +runcmd: + - echo {{v1.local_hostname}} > /var/tmp/runcmd_output + - echo {{merged_cfg._doc}} >> /var/tmp/runcmd_output + - echo {{v1['local-hostname']}} >> /var/tmp/runcmd_output +""" + + +@pytest.mark.user_data(USER_DATA) +def test_runcmd_with_variable_substitution(client: IntegrationInstance): + """Test jinja substitution. + + Ensure underscore-delimited aliases exist for hyphenated key and + we can also substitute variables from instance-data-sensitive + LP: #1931392. + """ + hostname = client.execute("hostname").stdout.strip() + expected = [ + hostname, + "Merged cloud-init system config from /etc/cloud/cloud.cfg and " + "/etc/cloud/cloud.cfg.d/", + hostname, + ] + output = client.read_from_file("/var/tmp/runcmd_output") + verify_ordered_items_in_text(expected, output) diff --git a/tests/integration_tests/modules/test_keyboard.py b/tests/integration_tests/modules/test_keyboard.py new file mode 100644 index 00000000..7db35014 --- /dev/null +++ b/tests/integration_tests/modules/test_keyboard.py @@ -0,0 +1,17 @@ +import pytest + +USER_DATA = """\ +#cloud-config +keyboard: + layout: de + model: pc105 + variant: nodeadkeys + options: compose:rwin +""" + + +class TestKeyboard: + @pytest.mark.user_data(USER_DATA) + def test_keyboard(self, client): + lc = client.execute("localectl") + assert "X11 Layout: de" in lc diff --git a/tests/integration_tests/modules/test_keys_to_console.py b/tests/integration_tests/modules/test_keys_to_console.py new file mode 100644 index 00000000..50899982 --- /dev/null +++ b/tests/integration_tests/modules/test_keys_to_console.py @@ -0,0 +1,113 @@ +"""Integration tests for the cc_keys_to_console module. + +(This is ported from +``tests/cloud_tests/testcases/modules/keys_to_console.yaml``.)""" +import pytest + +from tests.integration_tests.util import retry + +BLACKLIST_USER_DATA = """\ +#cloud-config +ssh_fp_console_blacklist: [ssh-dss, ssh-dsa, ecdsa-sha2-nistp256] +ssh_key_console_blacklist: [ssh-dss, ssh-dsa, ecdsa-sha2-nistp256] +""" + +BLACKLIST_ALL_KEYS_USER_DATA = """\ +#cloud-config +ssh_fp_console_blacklist: [ssh-dsa, ssh-ecdsa, ssh-ed25519, ssh-rsa, ssh-dss, ecdsa-sha2-nistp256] +""" # noqa: E501 + +DISABLED_USER_DATA = """\ +#cloud-config +ssh: + emit_keys_to_console: false +""" + +ENABLE_KEYS_TO_CONSOLE_USER_DATA = """\ +#cloud-config +ssh: + emit_keys_to_console: true +users: + - default + - name: barfoo +""" + + +@pytest.mark.user_data(BLACKLIST_USER_DATA) +class TestKeysToConsoleBlacklist: + """Test that the blacklist options work as expected.""" + + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA"]) + def test_excluded_keys(self, class_client, key_type): + syslog = class_client.read_from_file("/var/log/syslog") + assert "({})".format(key_type) not in syslog + + # retry decorator here because it can take some time to be reflected + # in syslog + @retry(tries=30, delay=1) + @pytest.mark.parametrize("key_type", ["ED25519", "RSA"]) + def test_included_keys(self, class_client, key_type): + syslog = class_client.read_from_file("/var/log/syslog") + assert "({})".format(key_type) in syslog + + +@pytest.mark.user_data(BLACKLIST_ALL_KEYS_USER_DATA) +class TestAllKeysToConsoleBlacklist: + """Test that when key blacklist contains all key types that + no header/footer are output. + """ + + def test_header_excluded(self, class_client): + syslog = class_client.read_from_file("/var/log/syslog") + assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog + + def test_footer_excluded(self, class_client): + syslog = class_client.read_from_file("/var/log/syslog") + assert "END SSH HOST KEY FINGERPRINTS" not in syslog + + +@pytest.mark.user_data(DISABLED_USER_DATA) +class TestKeysToConsoleDisabled: + """Test that output can be fully disabled.""" + + @pytest.mark.parametrize("key_type", ["DSA", "ECDSA", "ED25519", "RSA"]) + def test_keys_excluded(self, class_client, key_type): + syslog = class_client.read_from_file("/var/log/syslog") + assert "({})".format(key_type) not in syslog + + def test_header_excluded(self, class_client): + syslog = class_client.read_from_file("/var/log/syslog") + assert "BEGIN SSH HOST KEY FINGERPRINTS" not in syslog + + def test_footer_excluded(self, class_client): + syslog = class_client.read_from_file("/var/log/syslog") + assert "END SSH HOST KEY FINGERPRINTS" not in syslog + + +@pytest.mark.user_data(ENABLE_KEYS_TO_CONSOLE_USER_DATA) +@pytest.mark.ec2 +@pytest.mark.lxd_container +@pytest.mark.oci +@pytest.mark.openstack +class TestKeysToConsoleEnabled: + """Test that output can be enabled disabled.""" + + def test_duplicate_messaging_console_log(self, class_client): + class_client.execute("cloud-init status --wait --long").ok + try: + console_log = class_client.instance.console_log() + except NotImplementedError: + # Assume that an exception here means that we can't use the console + # log + pytest.skip("NotImplementedError when requesting console log") + return + if console_log.lower() == "no console output": + # This test retries because we might not have the full console log + # on the first fetch. However, if we have no console output + # at all, we don't want to keep retrying as that would trigger + # another 5 minute wait on the pycloudlib side, which could + # leave us waiting for a couple hours + pytest.fail("no console output") + return + msg = "no authorized SSH keys fingerprints found for user barfoo." + assert 1 == console_log.count(msg) diff --git a/tests/integration_tests/modules/test_lxd_bridge.py b/tests/integration_tests/modules/test_lxd_bridge.py new file mode 100644 index 00000000..3292a833 --- /dev/null +++ b/tests/integration_tests/modules/test_lxd_bridge.py @@ -0,0 +1,46 @@ +"""Integration tests for LXD bridge creation. + +(This is ported from +``tests/cloud_tests/testcases/modules/lxd_bridge.yaml``.) +""" +import pytest +import yaml + +from tests.integration_tests.util import verify_clean_log + +USER_DATA = """\ +#cloud-config +lxd: + init: + storage_backend: dir + bridge: + mode: new + name: lxdbr0 + ipv4_address: 10.100.100.1 + ipv4_netmask: 24 + ipv4_dhcp_first: 10.100.100.100 + ipv4_dhcp_last: 10.100.100.200 + ipv4_nat: true + domain: lxd +""" + + +@pytest.mark.no_container +@pytest.mark.user_data(USER_DATA) +class TestLxdBridge: + @pytest.mark.parametrize("binary_name", ["lxc", "lxd"]) + def test_binaries_installed(self, class_client, binary_name): + """Check that the expected LXD binaries are installed""" + assert class_client.execute(["which", binary_name]).ok + + def test_bridge(self, class_client): + """Check that the given bridge is configured""" + cloud_init_log = class_client.read_from_file("/var/log/cloud-init.log") + verify_clean_log(cloud_init_log) + + # The bridge should exist + assert class_client.execute("ip addr show lxdbr0") + + raw_network_config = class_client.execute("lxc network show lxdbr0") + network_config = yaml.safe_load(raw_network_config) + assert "10.100.100.1/24" == network_config["config"]["ipv4.address"] diff --git a/tests/integration_tests/modules/test_ntp_servers.py b/tests/integration_tests/modules/test_ntp_servers.py index e72389c1..fc62e63b 100644 --- a/tests/integration_tests/modules/test_ntp_servers.py +++ b/tests/integration_tests/modules/test_ntp_servers.py @@ -1,14 +1,18 @@ -"""Integration test for the ntp module's ``servers`` functionality with ntp. +"""Integration test for the ntp module's ntp functionality. This test specifies the use of the `ntp` NTP client, and ensures that the given NTP servers are configured as expected. -(This is ported from ``tests/cloud_tests/testcases/modules/ntp_servers.yaml``.) +(This is ported from ``tests/cloud_tests/testcases/modules/ntp_servers.yaml``, +``tests/cloud_tests/testcases/modules/ntp_pools.yaml``, +and ``tests/cloud_tests/testcases/modules/ntp_chrony.yaml``) """ import re -import yaml import pytest +import yaml + +from tests.integration_tests.instances import IntegrationInstance USER_DATA = """\ #cloud-config @@ -17,21 +21,25 @@ ntp: servers: - 172.16.15.14 - 172.16.17.18 + pools: + - 0.cloud-init.mypool + - 1.cloud-init.mypool + - 172.16.15.15 """ EXPECTED_SERVERS = yaml.safe_load(USER_DATA)["ntp"]["servers"] +EXPECTED_POOLS = yaml.safe_load(USER_DATA)["ntp"]["pools"] -@pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestNtpServers: - - def test_ntp_installed(self, class_client): + def test_ntp_installed(self, class_client: IntegrationInstance): """Test that `ntpd --version` succeeds, indicating installation.""" - result = class_client.execute("ntpd --version") - assert 0 == result.return_code + assert class_client.execute("ntpd --version").ok - def test_dist_config_file_is_empty(self, class_client): + def test_dist_config_file_is_empty( + self, class_client: IntegrationInstance + ): """Test that the distributed config file is empty. (This test is skipped on all currently supported Ubuntu releases, so @@ -42,17 +50,79 @@ class TestNtpServers: dist_file = class_client.read_from_file("/etc/ntp.conf.dist") assert 0 == len(dist_file.strip().splitlines()) - def test_ntp_entries(self, class_client): + def test_ntp_entries(self, class_client: IntegrationInstance): ntp_conf = class_client.read_from_file("/etc/ntp.conf") for expected_server in EXPECTED_SERVERS: assert re.search( r"^server {} iburst".format(expected_server), ntp_conf, - re.MULTILINE + re.MULTILINE, + ) + for expected_pool in EXPECTED_POOLS: + assert re.search( + r"^pool {} iburst".format(expected_pool), + ntp_conf, + re.MULTILINE, ) - def test_ntpq_servers(self, class_client): + def test_ntpq_servers(self, class_client: IntegrationInstance): result = class_client.execute("ntpq -p -w -n") assert result.ok - for expected_server in EXPECTED_SERVERS: - assert expected_server in result.stdout + for expected_server_or_pool in [*EXPECTED_SERVERS, *EXPECTED_POOLS]: + assert expected_server_or_pool in result.stdout + + +CHRONY_DATA = """\ +#cloud-config +ntp: + enabled: true + ntp_client: chrony + servers: + - 172.16.15.14 +""" + + +@pytest.mark.user_data(CHRONY_DATA) +def test_chrony(client: IntegrationInstance): + if client.execute("test -f /etc/chrony.conf").ok: + chrony_conf = "/etc/chrony.conf" + else: + chrony_conf = "/etc/chrony/chrony.conf" + contents = client.read_from_file(chrony_conf) + assert "server 172.16.15.14" in contents + + +TIMESYNCD_DATA = """\ +#cloud-config +ntp: + enabled: true + ntp_client: systemd-timesyncd + servers: + - 172.16.15.14 +""" + + +@pytest.mark.user_data(TIMESYNCD_DATA) +def test_timesyncd(client: IntegrationInstance): + contents = client.read_from_file( + "/etc/systemd/timesyncd.conf.d/cloud-init.conf" + ) + assert "NTP=172.16.15.14" in contents + + +EMPTY_NTP = """\ +#cloud-config +ntp: + ntp_client: ntp + pools: [] + servers: [] +""" + + +@pytest.mark.user_data(EMPTY_NTP) +def test_empty_ntp(client: IntegrationInstance): + assert client.execute("ntpd --version").ok + assert client.execute("test -f /etc/ntp.conf.dist").failed + assert "pool.ntp.org iburst" in client.execute( + 'grep -v "^#" /etc/ntp.conf' + ) diff --git a/tests/integration_tests/modules/test_package_update_upgrade_install.py b/tests/integration_tests/modules/test_package_update_upgrade_install.py index 8a38ad84..d668d81c 100644 --- a/tests/integration_tests/modules/test_package_update_upgrade_install.py +++ b/tests/integration_tests/modules/test_package_update_upgrade_install.py @@ -13,8 +13,8 @@ NOTE: the testcase for this looks for the command in history.log as """ import re -import pytest +import pytest USER_DATA = """\ #cloud-config @@ -26,9 +26,9 @@ package_upgrade: true """ +@pytest.mark.ubuntu @pytest.mark.user_data(USER_DATA) class TestPackageUpdateUpgradeInstall: - def assert_package_installed(self, pkg_out, name, version=None): """Check dpkg-query --show output for matching package name. @@ -37,7 +37,8 @@ class TestPackageUpdateUpgradeInstall: version. """ pkg_match = re.search( - "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE) + "^%s\t(?P<version>.*)$" % name, pkg_out, re.MULTILINE + ) if pkg_match: installed_version = pkg_match.group("version") if not version: @@ -45,8 +46,10 @@ class TestPackageUpdateUpgradeInstall: if installed_version.startswith(version): return # Success raise AssertionError( - "Expected package version %s-%s not found. Found %s" % - name, version, installed_version) + "Expected package version %s-%s not found. Found %s" % name, + version, + installed_version, + ) raise AssertionError("Package not installed: %s" % name) def test_new_packages_are_installed(self, class_client): @@ -57,11 +60,13 @@ class TestPackageUpdateUpgradeInstall: def test_packages_were_updated(self, class_client): out = class_client.execute( - "grep ^Commandline: /var/log/apt/history.log") + "grep ^Commandline: /var/log/apt/history.log" + ) assert ( "Commandline: /usr/bin/apt-get --option=Dpkg::Options" "::=--force-confold --option=Dpkg::options::=--force-unsafe-io " - "--assume-yes --quiet install sl tree") in out + "--assume-yes --quiet install sl tree" in out + ) def test_packages_were_upgraded(self, class_client): """Test cloud-init-output for install & upgrade stuff.""" diff --git a/tests/integration_tests/modules/test_persistence.py b/tests/integration_tests/modules/test_persistence.py new file mode 100644 index 00000000..33527e1e --- /dev/null +++ b/tests/integration_tests/modules/test_persistence.py @@ -0,0 +1,32 @@ +# This file is part of cloud-init. See LICENSE file for license information. +"""Test the behavior of loading/discarding pickle data""" +from pathlib import Path + +import pytest + +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import ( + ASSETS_DIR, + verify_ordered_items_in_text, +) + +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "trusty_with_mime.pkl" + + +@pytest.mark.lxd_container +def test_log_message_on_missing_version_file(client: IntegrationInstance): + client.push_file(TEST_PICKLE, PICKLE_PATH) + client.restart() + assert client.execute("cloud-init status --wait").ok + log = client.read_from_file("/var/log/cloud-init.log") + verify_ordered_items_in_text( + [ + "Unable to unpickle datasource: 'MIMEMultipart' object has no " + "attribute 'policy'. Ignoring current cache.", + "no cache found", + "Searching for local data source", + "SUCCESS: found local data from DataSourceNoCloud", + ], + log, + ) diff --git a/tests/integration_tests/modules/test_power_state_change.py b/tests/integration_tests/modules/test_power_state_change.py new file mode 100644 index 00000000..5cd19764 --- /dev/null +++ b/tests/integration_tests/modules/test_power_state_change.py @@ -0,0 +1,97 @@ +"""Integration test of the cc_power_state_change module. + +Test that the power state config options work as expected. +""" + +import time + +import pytest + +from tests.integration_tests.clouds import IntegrationCloud +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import verify_ordered_items_in_text + +USER_DATA = """\ +#cloud-config +power_state: + delay: {delay} + mode: {mode} + message: msg + timeout: {timeout} + condition: {condition} +""" + + +def _detect_reboot(instance: IntegrationInstance): + # We'll wait for instance up here, but we don't know if we're + # detecting the first boot or second boot, so we also check + # the logs to ensure we've booted twice. If the logs show we've + # only booted once, wait until we've booted twice + instance.instance.wait() + for _ in range(600): + try: + log = instance.read_from_file("/var/log/cloud-init.log") + boot_count = log.count("running 'init-local'") + if boot_count == 1: + instance.instance.wait() + elif boot_count > 1: + break + except Exception: + pass + time.sleep(1) + else: + raise Exception("Could not detect reboot") + + +def _can_connect(instance): + return instance.execute("true").ok + + +# This test is marked unstable because even though it should be able to +# run anywhere, I can only get it to run in an lxd container, and even then +# occasionally some timing issues will crop up. +@pytest.mark.unstable +@pytest.mark.ubuntu +@pytest.mark.lxd_container +class TestPowerChange: + @pytest.mark.parametrize( + "mode,delay,timeout,expected", + [ + ("poweroff", "now", "10", "will execute: shutdown -P now msg"), + ("reboot", "now", "0", "will execute: shutdown -r now msg"), + ("halt", "+1", "0", "will execute: shutdown -H +1 msg"), + ], + ) + def test_poweroff( + self, session_cloud: IntegrationCloud, mode, delay, timeout, expected + ): + with session_cloud.launch( + user_data=USER_DATA.format( + delay=delay, mode=mode, timeout=timeout, condition="true" + ), + launch_kwargs={"wait": False}, + ) as instance: + if mode == "reboot": + _detect_reboot(instance) + else: + instance.instance.wait_for_stop() + instance.instance.start(wait=True) + log = instance.read_from_file("/var/log/cloud-init.log") + assert _can_connect(instance) + lines_to_check = [ + "Running module power-state-change", + expected, + "running 'init-local'", + "config-power-state-change already ran", + ] + verify_ordered_items_in_text(lines_to_check, log) + + @pytest.mark.user_data( + USER_DATA.format( + delay="0", mode="poweroff", timeout="0", condition="false" + ) + ) + def test_poweroff_false_condition(self, client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init.log") + assert _can_connect(client) + assert "Condition was false. Will not perform state change" in log diff --git a/tests/integration_tests/modules/test_puppet.py b/tests/integration_tests/modules/test_puppet.py new file mode 100644 index 00000000..1bd9cee4 --- /dev/null +++ b/tests/integration_tests/modules/test_puppet.py @@ -0,0 +1,39 @@ +"""Test installation configuration of puppet module.""" +import pytest + +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import verify_clean_log + +SERVICE_DATA = """\ +#cloud-config +puppet: + install: true + install_type: packages +""" + + +@pytest.mark.user_data(SERVICE_DATA) +def test_puppet_service(client: IntegrationInstance): + """Basic test that puppet gets installed and runs.""" + log = client.read_from_file("/var/log/cloud-init.log") + verify_clean_log(log) + assert client.execute("systemctl is-active puppet").ok + assert "Running command ['puppet', 'agent'" not in log + + +EXEC_DATA = """\ +#cloud-config +puppet: + install: true + install_type: packages + exec: true + exec_args: ['--noop'] +""" + + +@pytest.mark.user_data +@pytest.mark.user_data(EXEC_DATA) +def test_pupet_exec(client: IntegrationInstance): + """Basic test that puppet gets installed and runs.""" + log = client.read_from_file("/var/log/cloud-init.log") + assert "Running command ['puppet', 'agent', '--noop']" in log diff --git a/tests/integration_tests/modules/test_runcmd.py b/tests/integration_tests/modules/test_runcmd.py deleted file mode 100644 index 50d1851e..00000000 --- a/tests/integration_tests/modules/test_runcmd.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Integration test for the runcmd module. - -This test specifies a command to be executed by the ``runcmd`` module -and then checks if that command was executed during boot. - -(This is ported from -``tests/cloud_tests/testcases/modules/runcmd.yaml``.)""" - -import pytest - - -USER_DATA = """\ -#cloud-config -runcmd: - - echo cloud-init run cmd test > /var/tmp/run_cmd -""" - - -@pytest.mark.ci -class TestRuncmd: - - @pytest.mark.user_data(USER_DATA) - def test_runcmd(self, client): - runcmd_output = client.read_from_file("/var/tmp/run_cmd") - assert runcmd_output.strip() == "cloud-init run cmd test" diff --git a/tests/integration_tests/modules/test_seed_random_data.py b/tests/integration_tests/modules/test_seed_random_data.py deleted file mode 100644 index b365fa98..00000000 --- a/tests/integration_tests/modules/test_seed_random_data.py +++ /dev/null @@ -1,28 +0,0 @@ -"""Integration test for the random seed module. - -This test specifies a command to be executed by the ``seed_random`` module, by -providing a different data to be used as seed data. We will then check -if that seed data was actually used. - -(This is ported from -``tests/cloud_tests/testcases/modules/seed_random_data.yaml``.)""" - -import pytest - - -USER_DATA = """\ -#cloud-config -random_seed: - data: 'MYUb34023nD:LFDK10913jk;dfnk:Df' - encoding: raw - file: /root/seed -""" - - -@pytest.mark.ci -class TestSeedRandomData: - - @pytest.mark.user_data(USER_DATA) - def test_seed_random_data(self, client): - seed_output = client.read_from_file("/root/seed") - assert seed_output.strip() == "MYUb34023nD:LFDK10913jk;dfnk:Df" diff --git a/tests/integration_tests/modules/test_set_hostname.py b/tests/integration_tests/modules/test_set_hostname.py index 2bfa403d..ae0aeae9 100644 --- a/tests/integration_tests/modules/test_set_hostname.py +++ b/tests/integration_tests/modules/test_set_hostname.py @@ -11,7 +11,6 @@ after the system is boot. import pytest - USER_DATA_HOSTNAME = """\ #cloud-config hostname: cloudinit2 @@ -24,15 +23,31 @@ hostname: cloudinit1 fqdn: cloudinit2.i9n.cloud-init.io """ +USER_DATA_PREFER_FQDN = """\ +#cloud-config +prefer_fqdn_over_hostname: {} +hostname: cloudinit1 +fqdn: cloudinit2.test.io +""" + @pytest.mark.ci class TestHostname: - @pytest.mark.user_data(USER_DATA_HOSTNAME) def test_hostname(self, client): hostname_output = client.execute("hostname") assert "cloudinit2" in hostname_output.strip() + @pytest.mark.user_data(USER_DATA_PREFER_FQDN.format(True)) + def test_prefer_fqdn(self, client): + hostname_output = client.execute("hostname") + assert "cloudinit2.test.io" in hostname_output.strip() + + @pytest.mark.user_data(USER_DATA_PREFER_FQDN.format(False)) + def test_prefer_short_hostname(self, client): + hostname_output = client.execute("hostname") + assert "cloudinit1" in hostname_output.strip() + @pytest.mark.user_data(USER_DATA_FQDN) def test_hostname_and_fqdn(self, client): hostname_output = client.execute("hostname") @@ -42,6 +57,8 @@ class TestHostname: assert "cloudinit2.i9n.cloud-init.io" in fqdn_output.strip() host_output = client.execute("grep ^127 /etc/hosts") - assert '127.0.1.1 {} {}'.format( - fqdn_output, hostname_output) in host_output - assert '127.0.0.1 localhost' in host_output + assert ( + "127.0.1.1 {} {}".format(fqdn_output, hostname_output) + in host_output + ) + assert "127.0.0.1 localhost" in host_output diff --git a/tests/integration_tests/modules/test_set_password.py b/tests/integration_tests/modules/test_set_password.py index b13f76fb..0e35cd26 100644 --- a/tests/integration_tests/modules/test_set_password.py +++ b/tests/integration_tests/modules/test_set_password.py @@ -8,11 +8,10 @@ other tests chpasswd's list being a string. Both expect the same results, so they use a mixin to share their test definitions, because we can (of course) only specify one user-data per instance. """ -import crypt - import pytest import yaml +from tests.integration_tests.util import retry COMMON_USER_DATA = """\ #cloud-config @@ -40,7 +39,9 @@ Uh69tP4GSrGW5XKHxMLiKowJgm/" lock_passwd: false """ -LIST_USER_DATA = COMMON_USER_DATA + """ +LIST_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: - tom:mypassword123! @@ -48,8 +49,11 @@ chpasswd: - harry:RANDOM - mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) -STRING_USER_DATA = COMMON_USER_DATA + """ +STRING_USER_DATA = ( + COMMON_USER_DATA + + """ chpasswd: list: | tom:mypassword123! @@ -57,6 +61,7 @@ chpasswd: harry:RANDOM mikey:$5$xZ$B2YGGEx2AOf4PeW48KC6.QyT1W2B4rZ9Qbltudtha89 """ +) USERS_DICTS = yaml.safe_load(COMMON_USER_DATA)["users"] USERS_PASSWD_VALUES = { @@ -116,14 +121,52 @@ class Mixin: # Which are not the same assert shadow_users["harry"] != shadow_users["dick"] + def test_random_passwords_not_stored_in_cloud_init_output_log( + self, class_client + ): + """We should not emit passwords to the in-instance log file. + + LP: #1918303 + """ + cloud_init_output = class_client.read_from_file( + "/var/log/cloud-init-output.log" + ) + assert "dick:" not in cloud_init_output + assert "harry:" not in cloud_init_output + + @retry(tries=30, delay=1) + def test_random_passwords_emitted_to_serial_console(self, class_client): + """We should emit passwords to the serial console. (LP: #1918303)""" + try: + console_log = class_client.instance.console_log() + except NotImplementedError: + # Assume that an exception here means that we can't use the console + # log + pytest.skip("NotImplementedError when requesting console log") + return + if console_log.lower() == "no console output": + # This test retries because we might not have the full console log + # on the first fetch. However, if we have no console output + # at all, we don't want to keep retrying as that would trigger + # another 5 minute wait on the pycloudlib side, which could + # leave us waiting for a couple hours + pytest.fail("no console output") + return + assert "dick:" in console_log + assert "harry:" in console_log + def test_explicit_password_set_correctly(self, class_client): """Test that an explicitly-specified password is set correctly.""" shadow_users, _ = self._fetch_and_parse_etc_shadow(class_client) fmt_and_salt = shadow_users["tom"].rsplit("$", 1)[0] - expected_value = crypt.crypt("mypassword123!", fmt_and_salt) - - assert expected_value == shadow_users["tom"] + GEN_CRYPT_CONTENT = ( + "import crypt\n" + f"print(crypt.crypt('mypassword123!', '{fmt_and_salt}'))\n" + ) + class_client.write_to_file("/gen_crypt.py", GEN_CRYPT_CONTENT) + result = class_client.execute("python3 /gen_crypt.py") + assert result.stdout == shadow_users["tom"] def test_shadow_expected_users(self, class_client): """Test that the right set of users is in /etc/shadow.""" diff --git a/tests/integration_tests/modules/test_snap.py b/tests/integration_tests/modules/test_snap.py deleted file mode 100644 index b626f6b0..00000000 --- a/tests/integration_tests/modules/test_snap.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Integration test for the snap module. - -This test specifies a command to be executed by the ``snap`` module -and then checks that if that command was executed during boot. - -(This is ported from -``tests/cloud_tests/testcases/modules/runcmd.yaml``.)""" - -import pytest - - -USER_DATA = """\ -#cloud-config -package_update: true -snap: - squashfuse_in_container: true - commands: - - snap install hello-world -""" - - -@pytest.mark.ci -class TestSnap: - - @pytest.mark.user_data(USER_DATA) - def test_snap(self, client): - snap_output = client.execute("snap list") - assert "core " in snap_output - assert "hello-world " in snap_output diff --git a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py index b9b0d85e..89b49576 100644 --- a/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py +++ b/tests/integration_tests/modules/test_ssh_auth_key_fingerprints.py @@ -12,13 +12,14 @@ import re import pytest +from tests.integration_tests.util import retry USER_DATA_SSH_AUTHKEY_DISABLE = """\ #cloud-config no_ssh_fingerprints: true """ -USER_DATA_SSH_AUTHKEY_ENABLE="""\ +USER_DATA_SSH_AUTHKEY_ENABLE = """\ #cloud-config ssh_genkeytypes: - ecdsa @@ -30,19 +31,22 @@ ssh_authorized_keys: @pytest.mark.ci class TestSshAuthkeyFingerprints: - @pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_DISABLE) def test_ssh_authkey_fingerprints_disable(self, client): cloudinit_output = client.read_from_file("/var/log/cloud-init.log") assert ( "Skipping module named ssh-authkey-fingerprints, " - "logging of SSH fingerprints disabled") in cloudinit_output + "logging of SSH fingerprints disabled" in cloudinit_output + ) + # retry decorator here because it can take some time to be reflected + # in syslog + @retry(tries=30, delay=1) @pytest.mark.user_data(USER_DATA_SSH_AUTHKEY_ENABLE) def test_ssh_authkey_fingerprints_enable(self, client): syslog_output = client.read_from_file("/var/log/syslog") - assert re.search(r'256 SHA256:.*(ECDSA)', syslog_output) is not None - assert re.search(r'256 SHA256:.*(ED25519)', syslog_output) is not None - assert re.search(r'1024 SHA256:.*(DSA)', syslog_output) is None - assert re.search(r'2048 SHA256:.*(RSA)', syslog_output) is None + assert re.search(r"256 SHA256:.*(ECDSA)", syslog_output) is not None + assert re.search(r"256 SHA256:.*(ED25519)", syslog_output) is not None + assert re.search(r"1024 SHA256:.*(DSA)", syslog_output) is None + assert re.search(r"2048 SHA256:.*(RSA)", syslog_output) is None diff --git a/tests/integration_tests/modules/test_ssh_generate.py b/tests/integration_tests/modules/test_ssh_generate.py index 60c36982..1dd0adf1 100644 --- a/tests/integration_tests/modules/test_ssh_generate.py +++ b/tests/integration_tests/modules/test_ssh_generate.py @@ -10,7 +10,6 @@ keys were created. import pytest - USER_DATA = """\ #cloud-config ssh_genkeytypes: @@ -23,28 +22,27 @@ authkey_hash: sha512 @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysGenerate: - @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_dsa_key.pub", "/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_rsa_key.pub", "/etc/ssh/ssh_host_rsa_key", - ) + ), ) def test_ssh_keys_not_generated(self, ssh_key_path, class_client): - out = class_client.execute( - "test -e {}".format(ssh_key_path) - ) + out = class_client.execute("test -e {}".format(ssh_key_path)) assert out.failed @pytest.mark.parametrize( - "ssh_key_path", ( + "ssh_key_path", + ( "/etc/ssh/ssh_host_ecdsa_key.pub", "/etc/ssh/ssh_host_ecdsa_key", "/etc/ssh/ssh_host_ed25519_key.pub", "/etc/ssh/ssh_host_ed25519_key", - ) + ), ) def test_ssh_keys_generated(self, ssh_key_path, class_client): out = class_client.read_from_file(ssh_key_path) diff --git a/tests/integration_tests/modules/test_ssh_import_id.py b/tests/integration_tests/modules/test_ssh_import_id.py deleted file mode 100644 index 45d37d6c..00000000 --- a/tests/integration_tests/modules/test_ssh_import_id.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Integration test for the ssh_import_id module. - -This test specifies ssh keys to be imported by the ``ssh_import_id`` module -and then checks that if the ssh keys were successfully imported. - -(This is ported from -``tests/cloud_tests/testcases/modules/ssh_import_id.yaml``.)""" - -import pytest - - -USER_DATA = """\ -#cloud-config -ssh_import_id: - - gh:powersj - - lp:smoser -""" - - -@pytest.mark.ci -class TestSshImportId: - - @pytest.mark.user_data(USER_DATA) - def test_ssh_import_id(self, client): - ssh_output = client.read_from_file( - "/home/ubuntu/.ssh/authorized_keys") - - assert '# ssh-import-id gh:powersj' in ssh_output - assert '# ssh-import-id lp:smoser' in ssh_output diff --git a/tests/integration_tests/modules/test_ssh_keys_provided.py b/tests/integration_tests/modules/test_ssh_keys_provided.py index 27d193c1..b79f18eb 100644 --- a/tests/integration_tests/modules/test_ssh_keys_provided.py +++ b/tests/integration_tests/modules/test_ssh_keys_provided.py @@ -9,7 +9,6 @@ system. import pytest - USER_DATA = """\ #cloud-config disable_root: false @@ -82,67 +81,60 @@ ssh_keys: @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestSshKeysProvided: - - def test_ssh_dsa_keys_provided(self, class_client): - """Test dsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key.pub") - assert ( - "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" - "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM") in out - - """Test dsa private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_dsa_key") - assert ( - "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" - "hOVAfzZ6+jklP") in out - - def test_ssh_rsa_keys_provided(self, class_client): - """Test rsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key.pub") - assert ( - "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" - "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4") in out - - """Test rsa private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key") - assert ( - "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" - "RQvLZpMRdywBm") in out - - def test_ssh_rsa_certificate_provided(self, class_client): - """Test rsa certificate was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_rsa_key-cert.pub") - assert ( - "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" - "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD") in out - - def test_ssh_certificate_updated_sshd_config(self, class_client): - """Test ssh certificate was added to /etc/ssh/sshd_config.""" - out = class_client.read_from_file("/etc/ssh/sshd_config").strip() - assert "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub" in out - - def test_ssh_ecdsa_keys_provided(self, class_client): - """Test ecdsa public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key.pub") - assert ( - "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" - "BBFsS5Tvky/IC/dXhE/afxxU") in out - - """Test ecdsa private key generated.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ecdsa_key") - assert ( - "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" - "5mpZqxgX4vcgb") in out - - def test_ssh_ed25519_keys_provided(self, class_client): - """Test ed25519 public key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key.pub") - assert ( - "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" - "G15dqjQ2XkNVOEnb5") in out - - """Test ed25519 private key was imported.""" - out = class_client.read_from_file("/etc/ssh/ssh_host_ed25519_key") - assert ( - "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" - "OhteXao0Nl5DVThJ2+Q") in out + @pytest.mark.parametrize( + "config_path,expected_out", + ( + ( + "/etc/ssh/ssh_host_dsa_key.pub", + "AAAAB3NzaC1kc3MAAACBAPkWy1zbchVIN7qTgM0/yyY8q4R" + "ZS8cNM4ZpeuE5UB/Nnr6OSU/nmbO8LuM", + ), + ( + "/etc/ssh/ssh_host_dsa_key", + "MIIBuwIBAAKBgQD5Fstc23IVSDe6k4DNP8smPKuEWUvHDTOGaXr" + "hOVAfzZ6+jklP", + ), + ( + "/etc/ssh/ssh_host_rsa_key.pub", + "AAAAB3NzaC1yc2EAAAADAQABAAABAQC0/Ho+o3eJISydO2JvIgT" + "LnZOtrxPl+fSvJfKDjoOLY0HB2eOjy2s2/2N6d9X9SGZ4", + ), + ( + "/etc/ssh/ssh_host_rsa_key", + "4DOkqNiUGl80Zp1RgZNohHUXlJMtAbrIlAVEk+mTmg7vjfyp2un" + "RQvLZpMRdywBm", + ), + ( + "/etc/ssh/ssh_host_rsa_key-cert.pub", + "AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgMpg" + "BP4Phn3L8I7Vqh7lmHKcOfIokEvSEbHDw83Y3JloAAAAD", + ), + ( + "/etc/ssh/sshd_config", + "HostCertificate /etc/ssh/ssh_host_rsa_key-cert.pub", + ), + ( + "/etc/ssh/ssh_host_ecdsa_key.pub", + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAAB" + "BBFsS5Tvky/IC/dXhE/afxxU", + ), + ( + "/etc/ssh/ssh_host_ecdsa_key", + "AwEHoUQDQgAEWxLlO+TL8gL91eET9p/HFQbqR1A691AkJgZk3jY" + "5mpZqxgX4vcgb", + ), + ( + "/etc/ssh/ssh_host_ed25519_key.pub", + "AAAAC3NzaC1lZDI1NTE5AAAAINudAZSu4vjZpVWzId5pXmZg1M6" + "G15dqjQ2XkNVOEnb5", + ), + ( + "/etc/ssh/ssh_host_ed25519_key", + "XAAAAAtzc2gtZWQyNTUxOQAAACDbnQGUruL42aVVsyHeaV5mYNT" + "OhteXao0Nl5DVThJ2+Q", + ), + ), + ) + def test_ssh_provided_keys(self, config_path, expected_out, class_client): + out = class_client.read_from_file(config_path).strip() + assert expected_out in out diff --git a/tests/integration_tests/modules/test_ssh_keysfile.py b/tests/integration_tests/modules/test_ssh_keysfile.py new file mode 100644 index 00000000..8330a1ce --- /dev/null +++ b/tests/integration_tests/modules/test_ssh_keysfile.py @@ -0,0 +1,224 @@ +from io import StringIO + +import paramiko +import pytest +from paramiko.ssh_exception import SSHException + +from tests.integration_tests.clouds import ImageSpecification +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import get_test_rsa_keypair + +TEST_USER1_KEYS = get_test_rsa_keypair("test1") +TEST_USER2_KEYS = get_test_rsa_keypair("test2") +TEST_DEFAULT_KEYS = get_test_rsa_keypair("test3") + +_USERDATA = """\ +#cloud-config +bootcmd: + - {bootcmd} +ssh_authorized_keys: + - {default} +users: +- default +- name: test_user1 + ssh_authorized_keys: + - {user1} +- name: test_user2 + ssh_authorized_keys: + - {user2} +""".format( + bootcmd="{bootcmd}", + default=TEST_DEFAULT_KEYS.public_key, + user1=TEST_USER1_KEYS.public_key, + user2=TEST_USER2_KEYS.public_key, +) + + +def common_verify(client, expected_keys): + for user, filename, keys in expected_keys: + # Ensure key is in the key file + contents = client.read_from_file(filename) + if user in ["ubuntu", "root"]: + lines = contents.split("\n") + if user == "root": + # Our personal public key gets added by pycloudlib in + # addition to the default `ssh_authorized_keys` + assert len(lines) == 2 + else: + # Clouds will insert the keys we've added to our accounts + # or for our launches + assert len(lines) >= 2 + assert keys.public_key.strip() in contents + else: + assert contents.strip() == keys.public_key.strip() + + # Ensure we can actually connect + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + paramiko_key = paramiko.RSAKey.from_private_key( + StringIO(keys.private_key) + ) + + # Will fail with AuthenticationException if + # we cannot connect + ssh.connect( + client.instance.ip, + username=user, + pkey=paramiko_key, + look_for_keys=False, + allow_agent=False, + ) + + # Ensure other uses can't connect using our key + other_users = [u[0] for u in expected_keys if u[2] != keys] + for other_user in other_users: + with pytest.raises(SSHException): + print( + "trying to connect as {} with key from {}".format( + other_user, user + ) + ) + ssh.connect( + client.instance.ip, + username=other_user, + pkey=paramiko_key, + look_for_keys=False, + allow_agent=False, + ) + + # Ensure we haven't messed with any /home permissions + # See LP: #1940233 + home_dir = "/home/{}".format(user) + # Home permissions aren't consistent between releases. On ubuntu + # this can change to 750 once focal is unsupported. + if ImageSpecification.from_os_image().release in ("bionic", "focal"): + home_perms = "755" + else: + home_perms = "750" + if user == "root": + home_dir = "/root" + home_perms = "700" + assert "{} {}".format(user, home_perms) == client.execute( + 'stat -c "%U %a" {}'.format(home_dir) + ) + if client.execute("test -d {}/.ssh".format(home_dir)).ok: + assert "{} 700".format(user) == client.execute( + 'stat -c "%U %a" {}/.ssh'.format(home_dir) + ) + assert "{} 600".format(user) == client.execute( + 'stat -c "%U %a" {}'.format(filename) + ) + + # Also ensure ssh-keygen works as expected + client.execute("mkdir {}/.ssh".format(home_dir)) + assert client.execute( + "ssh-keygen -b 2048 -t rsa -f {}/.ssh/id_rsa -q -N ''".format( + home_dir + ) + ).ok + assert client.execute("test -f {}/.ssh/id_rsa".format(home_dir)) + assert client.execute("test -f {}/.ssh/id_rsa.pub".format(home_dir)) + + assert "root 755" == client.execute('stat -c "%U %a" /home') + + +DEFAULT_KEYS_USERDATA = _USERDATA.format(bootcmd='""') + + +@pytest.mark.ubuntu +@pytest.mark.user_data(DEFAULT_KEYS_USERDATA) +def test_authorized_keys_default(client: IntegrationInstance): + expected_keys = [ + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys", TEST_DEFAULT_KEYS), + ] + common_verify(client, expected_keys) + + +AUTHORIZED_KEYS2_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' " + "/etc/ssh/sshd_config" + ) +) + + +@pytest.mark.ubuntu +@pytest.mark.user_data(AUTHORIZED_KEYS2_USERDATA) +def test_authorized_keys2(client: IntegrationInstance): + expected_keys = [ + ( + "test_user1", + "/home/test_user1/.ssh/authorized_keys2", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/home/test_user2/.ssh/authorized_keys2", + TEST_USER2_KEYS, + ), + ("ubuntu", "/home/ubuntu/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), + ("root", "/root/.ssh/authorized_keys2", TEST_DEFAULT_KEYS), + ] + common_verify(client, expected_keys) + + +NESTED_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys %h/foo/bar/ssh/keys;' " + "/etc/ssh/sshd_config" + ) +) + + +@pytest.mark.ubuntu +@pytest.mark.user_data(NESTED_KEYS_USERDATA) +def test_nested_keys(client: IntegrationInstance): + expected_keys = [ + ("test_user1", "/home/test_user1/foo/bar/ssh/keys", TEST_USER1_KEYS), + ("test_user2", "/home/test_user2/foo/bar/ssh/keys", TEST_USER2_KEYS), + ("ubuntu", "/home/ubuntu/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), + ("root", "/root/foo/bar/ssh/keys", TEST_DEFAULT_KEYS), + ] + common_verify(client, expected_keys) + + +EXTERNAL_KEYS_USERDATA = _USERDATA.format( + bootcmd=( + "sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile " + "/etc/ssh/authorized_keys /etc/ssh/authorized_keys/%u/keys;' " + "/etc/ssh/sshd_config" + ) +) + + +@pytest.mark.ubuntu +@pytest.mark.user_data(EXTERNAL_KEYS_USERDATA) +def test_external_keys(client: IntegrationInstance): + expected_keys = [ + ( + "test_user1", + "/etc/ssh/authorized_keys/test_user1/keys", + TEST_USER1_KEYS, + ), + ( + "test_user2", + "/etc/ssh/authorized_keys/test_user2/keys", + TEST_USER2_KEYS, + ), + ("ubuntu", "/etc/ssh/authorized_keys/ubuntu/keys", TEST_DEFAULT_KEYS), + ("root", "/etc/ssh/authorized_keys/root/keys", TEST_DEFAULT_KEYS), + ] + common_verify(client, expected_keys) diff --git a/tests/integration_tests/modules/test_timezone.py b/tests/integration_tests/modules/test_timezone.py deleted file mode 100644 index 111d53f7..00000000 --- a/tests/integration_tests/modules/test_timezone.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Integration test for the timezone module. - -This test specifies a timezone to be used by the ``timezone`` module -and then checks that if that timezone was respected during boot. - -(This is ported from -``tests/cloud_tests/testcases/modules/timezone.yaml``.)""" - -import pytest - - -USER_DATA = """\ -#cloud-config -timezone: US/Aleutian -""" - - -@pytest.mark.ci -class TestTimezone: - - @pytest.mark.user_data(USER_DATA) - def test_timezone(self, client): - timezone_output = client.execute( - 'date "+%Z" --date="Thu, 03 Nov 2016 00:47:00 -0400"') - assert timezone_output.strip() == "HDT" diff --git a/tests/integration_tests/modules/test_user_events.py b/tests/integration_tests/modules/test_user_events.py new file mode 100644 index 00000000..e4a4241f --- /dev/null +++ b/tests/integration_tests/modules/test_user_events.py @@ -0,0 +1,110 @@ +"""Test user-overridable events. + +This is currently limited to applying network config on BOOT events. +""" + +import re + +import pytest +import yaml + +from tests.integration_tests.instances import IntegrationInstance + + +def _add_dummy_bridge_to_netplan(client: IntegrationInstance): + # Update netplan configuration to ensure it doesn't change on reboot + netplan = yaml.safe_load( + client.execute("cat /etc/netplan/50-cloud-init.yaml") + ) + # Just a dummy bridge to do nothing + try: + netplan["network"]["bridges"]["dummy0"] = {"dhcp4": False} + except KeyError: + netplan["network"]["bridges"] = {"dummy0": {"dhcp4": False}} + + dumped_netplan = yaml.dump(netplan) + client.write_to_file("/etc/netplan/50-cloud-init.yaml", dumped_netplan) + + +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +@pytest.mark.ec2 +@pytest.mark.gce +@pytest.mark.oci +@pytest.mark.openstack +def test_boot_event_disabled_by_default(client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: + pytest.skip("network config disabled. Test doesn't apply") + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") + + _add_dummy_bridge_to_netplan(client) + client.execute("rm /var/log/cloud-init.log") + + client.restart() + log2 = client.read_from_file("/var/log/cloud-init.log") + + if "cache invalid in datasource" in log2: + # Invalid cache will get cleared, meaning we'll create a new + # "instance" and apply networking config, so events aren't + # really relevant here + pytest.skip("Test only valid for existing instances") + + # We attempt to apply network config twice on every boot. + # Ensure neither time works. + assert 2 == len( + re.findall( + r"Event Denied: scopes=\['network'\] EventType=boot[^-]", log2 + ) + ) + assert 2 == log2.count( + "Event Denied: scopes=['network'] EventType=boot-legacy" + ) + assert 2 == log2.count( + "No network config applied. Neither a new instance" + " nor datasource network update allowed" + ) + + assert "dummy0" in client.execute("ls /sys/class/net") + + +def _test_network_config_applied_on_reboot(client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init.log") + if "network config is disabled" in log: + pytest.skip("network config disabled. Test doesn't apply") + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") + + _add_dummy_bridge_to_netplan(client) + client.execute('echo "" > /var/log/cloud-init.log') + client.restart() + + log = client.read_from_file("/var/log/cloud-init.log") + if "cache invalid in datasource" in log: + # Invalid cache will get cleared, meaning we'll create a new + # "instance" and apply networking config, so events aren't + # really relevant here + pytest.skip("Test only valid for existing instances") + + assert "Event Allowed: scope=network EventType=boot" in log + assert "Applying network configuration" in log + assert "dummy0" not in client.execute("ls /sys/class/net") + + +@pytest.mark.azure +def test_boot_event_enabled_by_default(client: IntegrationInstance): + _test_network_config_applied_on_reboot(client) + + +USER_DATA = """\ +#cloud-config +updates: + network: + when: [boot] +""" + + +@pytest.mark.user_data(USER_DATA) +def test_boot_event_enabled(client: IntegrationInstance): + _test_network_config_applied_on_reboot(client) diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py index 6a51f5a6..fddff681 100644 --- a/tests/integration_tests/modules/test_users_groups.py +++ b/tests/integration_tests/modules/test_users_groups.py @@ -1,12 +1,15 @@ -"""Integration test for the user_groups module. +"""Integration tests for the user_groups module. -This test specifies a number of users and groups via user-data, and confirms -that they have been configured correctly in the system under test. +TODO: +* This module assumes that the "ubuntu" user will be created when "default" is + specified; this will need modification to run on other OSes. """ import re import pytest +from tests.integration_tests.clouds import ImageSpecification +from tests.integration_tests.instances import IntegrationInstance USER_DATA = """\ #cloud-config @@ -41,6 +44,13 @@ AHWYPYb2FT.lbioDm2RrkJPb9BZMN1O/ @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestUsersGroups: + """Test users and groups. + + This test specifies a number of users and groups via user-data, and + confirms that they have been configured correctly in the system under test. + """ + + @pytest.mark.ubuntu @pytest.mark.parametrize( "getent_args,regex", [ @@ -73,7 +83,9 @@ class TestUsersGroups: assert re.search(regex, result.stdout) is not None, ( "'getent {}' resulted in '{}', " "but expected to match regex {}".format( - ' '.join(getent_args), result.stdout, regex)) + " ".join(getent_args), result.stdout, regex + ) + ) def test_user_root_in_secret(self, class_client): """Test root user is in 'secret' group.""" @@ -81,3 +93,33 @@ class TestUsersGroups: _, groups_str = output.split(":", maxsplit=1) groups = groups_str.split() assert "secret" in groups + + +@pytest.mark.user_data(USER_DATA) +def test_sudoers_includedir(client: IntegrationInstance): + """Ensure we don't add additional #includedir to sudoers. + + Newer versions of /etc/sudoers will use @includedir rather than + #includedir. Ensure we handle that properly and don't include an + additional #includedir when one isn't warranted. + + https://github.com/canonical/cloud-init/pull/783 + """ + if ImageSpecification.from_os_image().release in [ + "bionic", + "focal", + ]: + raise pytest.skip( + "Test requires version of sudo installed on groovy and later" + ) + client.execute("sed -i 's/#include/@include/g' /etc/sudoers") + + sudoers = client.read_from_file("/etc/sudoers") + if "@includedir /etc/sudoers.d" not in sudoers: + client.execute("echo '@includedir /etc/sudoers.d' >> /etc/sudoers") + client.instance.clean() + client.restart() + sudoers = client.read_from_file("/etc/sudoers") + + assert "#includedir" not in sudoers + assert sudoers.count("includedir /etc/sudoers.d") == 1 diff --git a/tests/integration_tests/modules/test_version_change.py b/tests/integration_tests/modules/test_version_change.py new file mode 100644 index 00000000..3168cd60 --- /dev/null +++ b/tests/integration_tests/modules/test_version_change.py @@ -0,0 +1,76 @@ +from pathlib import Path + +import pytest + +from tests.integration_tests.instances import IntegrationInstance +from tests.integration_tests.util import ASSETS_DIR, verify_clean_log + +PICKLE_PATH = Path("/var/lib/cloud/instance/obj.pkl") +TEST_PICKLE = ASSETS_DIR / "test_version_change.pkl" + + +def _assert_no_pickle_problems(log): + assert "Failed loading pickled blob" not in log + verify_clean_log(log) + + +def test_reboot_without_version_change(client: IntegrationInstance): + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Cache compatibility status is currently unknown." not in log + _assert_no_pickle_problems(log) + + client.restart() + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected" not in log + assert "Could not determine Python version used to write cache" not in log + _assert_no_pickle_problems(log) + + # Now ensure that loading a bad pickle gives us problems + client.push_file(TEST_PICKLE, PICKLE_PATH) + client.restart() + log = client.read_from_file("/var/log/cloud-init.log") + + # no cache found is an "expected" upgrade error, and + # "Failed" means we're unable to load the pickle + assert any( + [ + "Failed loading pickled blob from {}".format(PICKLE_PATH) in log, + "no cache found" in log, + ] + ) + + +@pytest.mark.ec2 +@pytest.mark.gce +@pytest.mark.oci +@pytest.mark.openstack +@pytest.mark.lxd_container +@pytest.mark.lxd_vm +# No Azure because the cache gets purged every reboot, so we'll never +# get to the point where we need to purge cache due to version change +def test_cache_purged_on_version_change(client: IntegrationInstance): + # Start by pushing the invalid pickle so we'll hit an error if the + # cache didn't actually get purged + client.push_file(TEST_PICKLE, PICKLE_PATH) + client.execute("echo '1.0' > /var/lib/cloud/data/python-version") + client.restart() + log = client.read_from_file("/var/log/cloud-init.log") + assert "Python version change detected. Purging cache" in log + _assert_no_pickle_problems(log) + + +def test_log_message_on_missing_version_file(client: IntegrationInstance): + # Start by pushing a pickle so we can see the log message + client.push_file(TEST_PICKLE, PICKLE_PATH) + client.execute("rm /var/lib/cloud/data/python-version") + client.execute("rm /var/log/cloud-init.log") + client.restart() + log = client.read_from_file("/var/log/cloud-init.log") + if "no cache found" not in log: + # We don't expect the python version file to exist if we have no + # pre-existing cache + assert ( + "Writing python-version file. " + "Cache compatibility status is currently unknown." in log + ) diff --git a/tests/integration_tests/modules/test_write_files.py b/tests/integration_tests/modules/test_write_files.py index 15832ae3..1eb7e945 100644 --- a/tests/integration_tests/modules/test_write_files.py +++ b/tests/integration_tests/modules/test_write_files.py @@ -7,8 +7,8 @@ and then checks if those files were created during boot. ``tests/cloud_tests/testcases/modules/write_files.yaml``.)""" import base64 -import pytest +import pytest ASCII_TEXT = "ASCII text" B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8")) @@ -21,6 +21,9 @@ B64_CONTENT = base64.b64encode(ASCII_TEXT.encode("utf-8")) # USER_DATA = """\ #cloud-config +users: +- default +- name: myuser write_files: - encoding: b64 content: {} @@ -41,26 +44,50 @@ write_files: H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA= path: /root/file_gzip permissions: '0755' -""".format(B64_CONTENT.decode("ascii")) +- path: '/home/testuser/my-file' + content: | + echo 'hello world!' + defer: true + owner: 'myuser' + permissions: '0644' +""".format( + B64_CONTENT.decode("ascii") +) @pytest.mark.ci @pytest.mark.user_data(USER_DATA) class TestWriteFiles: - @pytest.mark.parametrize( - "cmd,expected_out", ( + "cmd,expected_out", + ( ("file /root/file_b64", ASCII_TEXT), ("md5sum </root/file_binary", "3801184b97bb8c6e63fa0e1eae2920d7"), - ("sha256sum </root/file_binary", ( + ( + "sha256sum </root/file_binary", "2c791c4037ea5bd7e928d6a87380f8ba" - "7a803cd83d5e4f269e28f5090f0f2c9a" - )), - ("file /root/file_gzip", - "POSIX shell script, ASCII text executable"), + "7a803cd83d5e4f269e28f5090f0f2c9a", + ), + ( + "file /root/file_gzip", + "POSIX shell script, ASCII text executable", + ), ("file /root/file_text", ASCII_TEXT), - ) + ), ) def test_write_files(self, cmd, expected_out, class_client): out = class_client.execute(cmd) assert expected_out in out + + def test_write_files_deferred(self, class_client): + """Test that write files deferred works as expected. + + Users get created after write_files module runs, so ensure that + with `defer: true`, the file gets written with correct ownership. + """ + out = class_client.read_from_file("/home/testuser/my-file") + assert "echo 'hello world!'" == out + assert ( + class_client.execute('stat -c "%U %a" /home/testuser/my-file') + == "myuser 644" + ) |