summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2018-04-20 12:22:23 -0600
committerChad Smith <chad.smith@canonical.com>2018-04-20 12:22:23 -0600
commit4952a8545b61ceb2fe26a933d2f64020d0281703 (patch)
tree6d765537ce93bf756eef3910e69edbe27ee63c26 /tests/unittests
parent1081962eacf2814fea6f4fa3255c530de14e4a24 (diff)
downloadvyos-cloud-init-4952a8545b61ceb2fe26a933d2f64020d0281703.tar.gz
vyos-cloud-init-4952a8545b61ceb2fe26a933d2f64020d0281703.zip
set_passwords: Add newline to end of sshd config, only restart if updated.
This admittedly does a fairly extensive re-factor to simply add a newline to the end of sshd_config. It makes the ssh_config updating portion of set_passwords more testable and adds tests for that. The new function is in 'update_ssh_config_lines' which allows you to update a config with multiple changes even though only a single one is currently used. We also only restart the ssh daemon now if a change was made to the config file. Before it was always restarted if the user specified a value for ssh_pwauth other than 'unchanged'. Thanks to Lorens Kockum for initial diagnosis and patch. LP: #1677205
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_sshutil.py97
1 files changed, 94 insertions, 3 deletions
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index 4c62c8be..73ae897f 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -4,6 +4,7 @@ from mock import patch
from cloudinit import ssh_util
from cloudinit.tests import helpers as test_helpers
+from cloudinit import util
VALID_CONTENT = {
@@ -56,7 +57,7 @@ TEST_OPTIONS = (
'user \"root\".\';echo;sleep 10"')
-class TestAuthKeyLineParser(test_helpers.TestCase):
+class TestAuthKeyLineParser(test_helpers.CiTestCase):
def test_simple_parse(self):
# test key line with common 3 fields (keytype, base64, comment)
@@ -126,7 +127,7 @@ class TestAuthKeyLineParser(test_helpers.TestCase):
self.assertFalse(key.valid())
-class TestUpdateAuthorizedKeys(test_helpers.TestCase):
+class TestUpdateAuthorizedKeys(test_helpers.CiTestCase):
def test_new_keys_replace(self):
"""new entries with the same base64 should replace old."""
@@ -168,7 +169,7 @@ class TestUpdateAuthorizedKeys(test_helpers.TestCase):
self.assertEqual(expected, found)
-class TestParseSSHConfig(test_helpers.TestCase):
+class TestParseSSHConfig(test_helpers.CiTestCase):
def setUp(self):
self.load_file_patch = patch('cloudinit.ssh_util.util.load_file')
@@ -235,4 +236,94 @@ class TestParseSSHConfig(test_helpers.TestCase):
self.assertEqual('foo', ret[0].key)
self.assertEqual('bar', ret[0].value)
+
+class TestUpdateSshConfigLines(test_helpers.CiTestCase):
+ """Test the update_ssh_config_lines method."""
+ exlines = [
+ "#PasswordAuthentication yes",
+ "UsePAM yes",
+ "# Comment line",
+ "AcceptEnv LANG LC_*",
+ "X11Forwarding no",
+ ]
+ pwauth = "PasswordAuthentication"
+
+ def check_line(self, line, opt, val):
+ self.assertEqual(line.key, opt.lower())
+ self.assertEqual(line.value, val)
+ self.assertIn(opt, str(line))
+ self.assertIn(val, str(line))
+
+ def test_new_option_added(self):
+ """A single update of non-existing option."""
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, {'MyKey': 'MyVal'})
+ self.assertEqual(['MyKey'], result)
+ self.check_line(lines[-1], "MyKey", "MyVal")
+
+ def test_commented_out_not_updated_but_appended(self):
+ """Implementation does not un-comment and update lines."""
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, {self.pwauth: "no"})
+ self.assertEqual([self.pwauth], result)
+ self.check_line(lines[-1], self.pwauth, "no")
+
+ def test_single_option_updated(self):
+ """A single update should have change made and line updated."""
+ opt, val = ("UsePAM", "no")
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, {opt: val})
+ self.assertEqual([opt], result)
+ self.check_line(lines[1], opt, val)
+
+ def test_multiple_updates_with_add(self):
+ """Verify multiple updates some added some changed, some not."""
+ updates = {"UsePAM": "no", "X11Forwarding": "no", "NewOpt": "newval",
+ "AcceptEnv": "LANG ADD LC_*"}
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ self.assertEqual(set(["UsePAM", "NewOpt", "AcceptEnv"]), set(result))
+ self.check_line(lines[3], "AcceptEnv", updates["AcceptEnv"])
+
+ def test_return_empty_if_no_changes(self):
+ """If there are no changes, then return should be empty list."""
+ updates = {"UsePAM": "yes"}
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ self.assertEqual([], result)
+ self.assertEqual(self.exlines, [str(l) for l in lines])
+
+ def test_keycase_not_modified(self):
+ """Original case of key should not be changed on update.
+ This behavior is to keep original config as much intact as can be."""
+ updates = {"usepam": "no"}
+ lines = ssh_util.parse_ssh_config_lines(list(self.exlines))
+ result = ssh_util.update_ssh_config_lines(lines, updates)
+ self.assertEqual(["usepam"], result)
+ self.assertEqual("UsePAM no", str(lines[1]))
+
+
+class TestUpdateSshConfig(test_helpers.CiTestCase):
+ cfgdata = '\n'.join(["#Option val", "MyKey ORIG_VAL", ""])
+
+ def test_modified(self):
+ mycfg = self.tmp_path("ssh_config_1")
+ util.write_file(mycfg, self.cfgdata)
+ ret = ssh_util.update_ssh_config({"MyKey": "NEW_VAL"}, mycfg)
+ self.assertTrue(ret)
+ found = util.load_file(mycfg)
+ self.assertEqual(self.cfgdata.replace("ORIG_VAL", "NEW_VAL"), found)
+ # assert there is a newline at end of file (LP: #1677205)
+ self.assertEqual('\n', found[-1])
+
+ def test_not_modified(self):
+ mycfg = self.tmp_path("ssh_config_2")
+ util.write_file(mycfg, self.cfgdata)
+ with patch("cloudinit.ssh_util.util.write_file") as m_write_file:
+ ret = ssh_util.update_ssh_config({"MyKey": "ORIG_VAL"}, mycfg)
+ self.assertFalse(ret)
+ self.assertEqual(self.cfgdata, util.load_file(mycfg))
+ m_write_file.assert_not_called()
+
+
# vi: ts=4 expandtab