summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests/test_set_passwords.py
blob: a2ea5ec42d21dbba6ba884e5bde74261faea6251 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# This file is part of cloud-init. See LICENSE file for license information.

import mock

from cloudinit.config import cc_set_passwords as setpass
from cloudinit.tests.helpers import CiTestCase
from cloudinit import util

MODPATH = "cloudinit.config.cc_set_passwords."


class TestHandleSshPwauth(CiTestCase):
    """Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth."""

    with_logs = True

    @mock.patch(MODPATH + "util.subp")
    def test_unknown_value_logs_warning(self, m_subp):
        setpass.handle_ssh_pwauth("floo")
        self.assertIn("Unrecognized value: ssh_pwauth=floo",
                      self.logs.getvalue())
        m_subp.assert_not_called()

    @mock.patch(MODPATH + "update_ssh_config", return_value=True)
    @mock.patch(MODPATH + "util.subp")
    def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config):
        """If systemctl in service cmd: systemctl restart name."""
        setpass.handle_ssh_pwauth(
            True, service_cmd=["systemctl"], service_name="myssh")
        self.assertEqual(mock.call(["systemctl", "restart", "myssh"]),
                         m_subp.call_args)

    @mock.patch(MODPATH + "update_ssh_config", return_value=True)
    @mock.patch(MODPATH + "util.subp")
    def test_service_as_service_cmd(self, m_subp, m_update_ssh_config):
        """If systemctl in service cmd: systemctl restart name."""
        setpass.handle_ssh_pwauth(
            True, service_cmd=["service"], service_name="myssh")
        self.assertEqual(mock.call(["service", "myssh", "restart"]),
                         m_subp.call_args)

    @mock.patch(MODPATH + "update_ssh_config", return_value=False)
    @mock.patch(MODPATH + "util.subp")
    def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config):
        """If config is not updated, then no system restart should be done."""
        setpass.handle_ssh_pwauth(True)
        m_subp.assert_not_called()
        self.assertIn("No need to restart ssh", self.logs.getvalue())

    @mock.patch(MODPATH + "update_ssh_config", return_value=True)
    @mock.patch(MODPATH + "util.subp")
    def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config):
        """If 'unchanged', then no updates to config and no restart."""
        setpass.handle_ssh_pwauth(
            "unchanged", service_cmd=["systemctl"], service_name="myssh")
        m_update_ssh_config.assert_not_called()
        m_subp.assert_not_called()

    @mock.patch(MODPATH + "util.subp")
    def test_valid_change_values(self, m_subp):
        """If value is a valid changen value, then update should be called."""
        upname = MODPATH + "update_ssh_config"
        optname = "PasswordAuthentication"
        for value in util.FALSE_STRINGS + util.TRUE_STRINGS:
            optval = "yes" if value in util.TRUE_STRINGS else "no"
            with mock.patch(upname, return_value=False) as m_update:
                setpass.handle_ssh_pwauth(value)
                m_update.assert_called_with({optname: optval})
        m_subp.assert_not_called()


class TestSetPasswordsHandle(CiTestCase):
    """Test cc_set_passwords.handle"""

    with_logs = True

    def test_handle_on_empty_config(self):
        """handle logs that no password has changed when config is empty."""
        cloud = self.tmp_cloud(distro='ubuntu')
        setpass.handle(
            'IGNORED', cfg={}, cloud=cloud, log=self.logger, args=[])
        self.assertEqual(
            "DEBUG: Leaving ssh config 'PasswordAuthentication' unchanged. "
            'ssh_pwauth=None\n',
            self.logs.getvalue())

    @mock.patch(MODPATH + "util.subp")
    def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
        """handle parses command password hashes."""
        cloud = self.tmp_cloud(distro='ubuntu')
        valid_hashed_pwds = [
            'root:$2y$10$8BQjxjVByHA/Ee.O1bCXtO8S7Y5WojbXWqnqYpUW.BrPx/'
            'Dlew1Va',
            'ubuntu:$6$5hOurLPO$naywm3Ce0UlmZg9gG2Fl9acWCVEoakMMC7dR52q'
            'SDexZbrN9z8yHxhUM2b.sxpguSwOlbOQSW/HpXazGGx3oo1']
        cfg = {'chpasswd': {'list': valid_hashed_pwds}}
        with mock.patch(MODPATH + 'util.subp') as m_subp:
            setpass.handle(
                'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
        self.assertIn(
            'DEBUG: Handling input for chpasswd as list.',
            self.logs.getvalue())
        self.assertIn(
            "DEBUG: Setting hashed password for ['root', 'ubuntu']",
            self.logs.getvalue())
        self.assertEqual(
            [mock.call(['chpasswd', '-e'],
             '\n'.join(valid_hashed_pwds) + '\n')],
            m_subp.call_args_list)

# vi: ts=4 expandtab