summaryrefslogtreecommitdiff
path: root/tests/integration_tests/modules/test_ssh_keysfile.py
blob: f82d76494ed76ce4e31832a4adf2e762873e7aa8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import paramiko
import pytest
from io import StringIO
from paramiko.ssh_exception import SSHException

from tests.integration_tests.instances import IntegrationInstance
from tests.integration_tests.util import get_test_rsa_keypair

TEST_USER1_KEYS = get_test_rsa_keypair('test1')
TEST_USER2_KEYS = get_test_rsa_keypair('test2')
TEST_DEFAULT_KEYS = get_test_rsa_keypair('test3')

USERDATA = """\
#cloud-config
bootcmd:
 - sed -i 's;#AuthorizedKeysFile.*;AuthorizedKeysFile /etc/ssh/authorized_keys %h/.ssh/authorized_keys2;' /etc/ssh/sshd_config
ssh_authorized_keys:
 - {default}
users:
- default
- name: test_user1
  ssh_authorized_keys:
    - {user1}
- name: test_user2
  ssh_authorized_keys:
    - {user2}
""".format(  # noqa: E501
    default=TEST_DEFAULT_KEYS.public_key,
    user1=TEST_USER1_KEYS.public_key,
    user2=TEST_USER2_KEYS.public_key,
)


@pytest.mark.ubuntu
@pytest.mark.user_data(USERDATA)
def test_authorized_keys(client: IntegrationInstance):
    expected_keys = [
        ('test_user1', '/home/test_user1/.ssh/authorized_keys2',
         TEST_USER1_KEYS),
        ('test_user2', '/home/test_user2/.ssh/authorized_keys2',
         TEST_USER2_KEYS),
        ('ubuntu', '/home/ubuntu/.ssh/authorized_keys2',
         TEST_DEFAULT_KEYS),
        ('root', '/root/.ssh/authorized_keys2', TEST_DEFAULT_KEYS),
    ]

    for user, filename, keys in expected_keys:
        contents = client.read_from_file(filename)
        if user in ['ubuntu', 'root']:
            # Our personal public key gets added by pycloudlib
            lines = contents.split('\n')
            assert len(lines) == 2
            assert keys.public_key.strip() in contents
        else:
            assert contents.strip() == keys.public_key.strip()

        # Ensure we can actually connect
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        paramiko_key = paramiko.RSAKey.from_private_key(StringIO(
            keys.private_key))

        # Will fail with AuthenticationException if
        # we cannot connect
        ssh.connect(
            client.instance.ip,
            username=user,
            pkey=paramiko_key,
            look_for_keys=False,
            allow_agent=False,
        )

        # Ensure other uses can't connect using our key
        other_users = [u[0] for u in expected_keys if u[2] != keys]
        for other_user in other_users:
            with pytest.raises(SSHException):
                print('trying to connect as {} with key from {}'.format(
                    other_user, user))
                ssh.connect(
                    client.instance.ip,
                    username=other_user,
                    pkey=paramiko_key,
                    look_for_keys=False,
                    allow_agent=False,
                )