summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests/test_ssh.py
blob: c8a4271f52df1a0ac668668f268877a6f8b10c28 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# This file is part of cloud-init. See LICENSE file for license information.


from cloudinit.config import cc_ssh
from cloudinit import ssh_util
from cloudinit.tests.helpers import CiTestCase, mock

MODPATH = "cloudinit.config.cc_ssh."


@mock.patch(MODPATH + "ssh_util.setup_user_keys")
class TestHandleSsh(CiTestCase):
    """Test cc_ssh handling of ssh config."""

    def test_apply_credentials_with_user(self, m_setup_keys):
        """Apply keys for the given user and root."""
        keys = ["key1"]
        user = "clouduser"
        cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
        self.assertEqual([mock.call(set(keys), user),
                          mock.call(set(keys), "root", options="")],
                         m_setup_keys.call_args_list)

    def test_apply_credentials_with_no_user(self, m_setup_keys):
        """Apply keys for root only."""
        keys = ["key1"]
        user = None
        cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
        self.assertEqual([mock.call(set(keys), "root", options="")],
                         m_setup_keys.call_args_list)

    def test_apply_credentials_with_user_disable_root(self, m_setup_keys):
        """Apply keys for the given user and disable root ssh."""
        keys = ["key1"]
        user = "clouduser"
        options = ssh_util.DISABLE_USER_OPTS
        cc_ssh.apply_credentials(keys, user, True, options)
        options = options.replace("$USER", user)
        options = options.replace("$DISABLE_USER", "root")
        self.assertEqual([mock.call(set(keys), user),
                          mock.call(set(keys), "root", options=options)],
                         m_setup_keys.call_args_list)

    def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys):
        """Apply keys no user and disable root ssh."""
        keys = ["key1"]
        user = None
        options = ssh_util.DISABLE_USER_OPTS
        cc_ssh.apply_credentials(keys, user, True, options)
        options = options.replace("$USER", "NONE")
        options = options.replace("$DISABLE_USER", "root")
        self.assertEqual([mock.call(set(keys), "root", options=options)],
                         m_setup_keys.call_args_list)

    @mock.patch(MODPATH + "glob.glob")
    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
    @mock.patch(MODPATH + "os.path.exists")
    def test_handle_no_cfg(self, m_path_exists, m_nug,
                           m_glob, m_setup_keys):
        """Test handle with no config ignores generating existing keyfiles."""
        cfg = {}
        keys = ["key1"]
        m_glob.return_value = []  # Return no matching keys to prevent removal
        # Mock os.path.exits to True to short-circuit the key writing logic
        m_path_exists.return_value = True
        m_nug.return_value = ([], {})
        cloud = self.tmp_cloud(
            distro='ubuntu', metadata={'public-keys': keys})
        cc_ssh.handle("name", cfg, cloud, None, None)
        options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE")
        options = options.replace("$DISABLE_USER", "root")
        m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*')
        self.assertIn(
            [mock.call('/etc/ssh/ssh_host_rsa_key'),
             mock.call('/etc/ssh/ssh_host_dsa_key'),
             mock.call('/etc/ssh/ssh_host_ecdsa_key'),
             mock.call('/etc/ssh/ssh_host_ed25519_key')],
            m_path_exists.call_args_list)
        self.assertEqual([mock.call(set(keys), "root", options=options)],
                         m_setup_keys.call_args_list)

    @mock.patch(MODPATH + "glob.glob")
    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
    @mock.patch(MODPATH + "os.path.exists")
    def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug,
                                            m_glob, m_setup_keys):
        """Test handle with no config and a default distro user."""
        cfg = {}
        keys = ["key1"]
        user = "clouduser"
        m_glob.return_value = []  # Return no matching keys to prevent removal
        # Mock os.path.exits to True to short-circuit the key writing logic
        m_path_exists.return_value = True
        m_nug.return_value = ({user: {"default": user}}, {})
        cloud = self.tmp_cloud(
            distro='ubuntu', metadata={'public-keys': keys})
        cc_ssh.handle("name", cfg, cloud, None, None)

        options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
        options = options.replace("$DISABLE_USER", "root")
        self.assertEqual([mock.call(set(keys), user),
                          mock.call(set(keys), "root", options=options)],
                         m_setup_keys.call_args_list)

    @mock.patch(MODPATH + "glob.glob")
    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
    @mock.patch(MODPATH + "os.path.exists")
    def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug,
                                                   m_glob, m_setup_keys):
        """Test handle with explicit disable_root and a default distro user."""
        # This test is identical to test_handle_no_cfg_and_default_root,
        # except this uses an explicit cfg value
        cfg = {"disable_root": True}
        keys = ["key1"]
        user = "clouduser"
        m_glob.return_value = []  # Return no matching keys to prevent removal
        # Mock os.path.exits to True to short-circuit the key writing logic
        m_path_exists.return_value = True
        m_nug.return_value = ({user: {"default": user}}, {})
        cloud = self.tmp_cloud(
            distro='ubuntu', metadata={'public-keys': keys})
        cc_ssh.handle("name", cfg, cloud, None, None)

        options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
        options = options.replace("$DISABLE_USER", "root")
        self.assertEqual([mock.call(set(keys), user),
                          mock.call(set(keys), "root", options=options)],
                         m_setup_keys.call_args_list)

    @mock.patch(MODPATH + "glob.glob")
    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
    @mock.patch(MODPATH + "os.path.exists")
    def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug,
                                             m_glob, m_setup_keys):
        """Test handle with disable_root == False."""
        # When disable_root == False, the ssh redirect for root is skipped
        cfg = {"disable_root": False}
        keys = ["key1"]
        user = "clouduser"
        m_glob.return_value = []  # Return no matching keys to prevent removal
        # Mock os.path.exits to True to short-circuit the key writing logic
        m_path_exists.return_value = True
        m_nug.return_value = ({user: {"default": user}}, {})
        cloud = self.tmp_cloud(
            distro='ubuntu', metadata={'public-keys': keys})
        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
        cc_ssh.handle("name", cfg, cloud, None, None)

        self.assertEqual([mock.call(set(keys), user),
                          mock.call(set(keys), "root", options="")],
                         m_setup_keys.call_args_list)