1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# This file is part of cloud-init. See LICENSE file for license information.
from unittest import mock
from cloudinit.config import cc_set_passwords as setpass
from cloudinit.tests.helpers import CiTestCase
from cloudinit import util
MODPATH = "cloudinit.config.cc_set_passwords."
class TestHandleSshPwauth(CiTestCase):
"""Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth."""
with_logs = True
@mock.patch(MODPATH + "util.subp")
def test_unknown_value_logs_warning(self, m_subp):
setpass.handle_ssh_pwauth("floo")
self.assertIn("Unrecognized value: ssh_pwauth=floo",
self.logs.getvalue())
m_subp.assert_not_called()
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch(MODPATH + "util.subp")
def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config):
"""If systemctl in service cmd: systemctl restart name."""
setpass.handle_ssh_pwauth(
True, service_cmd=["systemctl"], service_name="myssh")
self.assertEqual(mock.call(["systemctl", "restart", "myssh"]),
m_subp.call_args)
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch(MODPATH + "util.subp")
def test_service_as_service_cmd(self, m_subp, m_update_ssh_config):
"""If systemctl in service cmd: systemctl restart name."""
setpass.handle_ssh_pwauth(
True, service_cmd=["service"], service_name="myssh")
self.assertEqual(mock.call(["service", "myssh", "restart"]),
m_subp.call_args)
@mock.patch(MODPATH + "update_ssh_config", return_value=False)
@mock.patch(MODPATH + "util.subp")
def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config):
"""If config is not updated, then no system restart should be done."""
setpass.handle_ssh_pwauth(True)
m_subp.assert_not_called()
self.assertIn("No need to restart SSH", self.logs.getvalue())
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch(MODPATH + "util.subp")
def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config):
"""If 'unchanged', then no updates to config and no restart."""
setpass.handle_ssh_pwauth(
"unchanged", service_cmd=["systemctl"], service_name="myssh")
m_update_ssh_config.assert_not_called()
m_subp.assert_not_called()
@mock.patch(MODPATH + "util.subp")
def test_valid_change_values(self, m_subp):
"""If value is a valid changen value, then update should be called."""
upname = MODPATH + "update_ssh_config"
optname = "PasswordAuthentication"
for value in util.FALSE_STRINGS + util.TRUE_STRINGS:
optval = "yes" if value in util.TRUE_STRINGS else "no"
with mock.patch(upname, return_value=False) as m_update:
setpass.handle_ssh_pwauth(value)
m_update.assert_called_with({optname: optval})
m_subp.assert_not_called()
class TestSetPasswordsHandle(CiTestCase):
"""Test cc_set_passwords.handle"""
with_logs = True
def test_handle_on_empty_config(self, *args):
"""handle logs that no password has changed when config is empty."""
cloud = self.tmp_cloud(distro='ubuntu')
setpass.handle(
'IGNORED', cfg={}, cloud=cloud, log=self.logger, args=[])
self.assertEqual(
"DEBUG: Leaving SSH config 'PasswordAuthentication' unchanged. "
'ssh_pwauth=None\n',
self.logs.getvalue())
@mock.patch(MODPATH + "util.subp")
def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
"""handle parses command password hashes."""
cloud = self.tmp_cloud(distro='ubuntu')
valid_hashed_pwds = [
'root:$2y$10$8BQjxjVByHA/Ee.O1bCXtO8S7Y5WojbXWqnqYpUW.BrPx/'
'Dlew1Va',
'ubuntu:$6$5hOurLPO$naywm3Ce0UlmZg9gG2Fl9acWCVEoakMMC7dR52q'
'SDexZbrN9z8yHxhUM2b.sxpguSwOlbOQSW/HpXazGGx3oo1']
cfg = {'chpasswd': {'list': valid_hashed_pwds}}
with mock.patch(MODPATH + 'util.subp') as m_subp:
setpass.handle(
'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
self.assertIn(
'DEBUG: Handling input for chpasswd as list.',
self.logs.getvalue())
self.assertIn(
"DEBUG: Setting hashed password for ['root', 'ubuntu']",
self.logs.getvalue())
self.assertEqual(
[mock.call(['chpasswd', '-e'],
'\n'.join(valid_hashed_pwds) + '\n')],
m_subp.call_args_list)
@mock.patch(MODPATH + "util.is_FreeBSD")
@mock.patch(MODPATH + "util.subp")
def test_freebsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
self, m_subp, m_is_freebsd):
"""FreeBSD calls custom pw commands instead of chpasswd and passwd"""
m_is_freebsd.return_value = True
cloud = self.tmp_cloud(distro='freebsd')
valid_pwds = ['ubuntu:passw0rd']
cfg = {'chpasswd': {'list': valid_pwds}}
setpass.handle(
'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
self.assertEqual([
mock.call(['pw', 'usermod', 'ubuntu', '-h', '0'], data='passw0rd',
logstring="chpasswd for ubuntu"),
mock.call(['pw', 'usermod', 'ubuntu', '-p', '01-Jan-1970'])],
m_subp.call_args_list)
@mock.patch(MODPATH + "util.is_FreeBSD")
@mock.patch(MODPATH + "util.subp")
def test_handle_on_chpasswd_list_creates_random_passwords(self, m_subp,
m_is_freebsd):
"""handle parses command set random passwords."""
m_is_freebsd.return_value = False
cloud = self.tmp_cloud(distro='ubuntu')
valid_random_pwds = [
'root:R',
'ubuntu:RANDOM']
cfg = {'chpasswd': {'expire': 'false', 'list': valid_random_pwds}}
with mock.patch(MODPATH + 'util.subp') as m_subp:
setpass.handle(
'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
self.assertIn(
'DEBUG: Handling input for chpasswd as list.',
self.logs.getvalue())
self.assertNotEqual(
[mock.call(['chpasswd'],
'\n'.join(valid_random_pwds) + '\n')],
m_subp.call_args_list)
# vi: ts=4 expandtab
|