summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/tests')
-rw-r--r--cloudinit/config/tests/test_mounts.py33
-rw-r--r--cloudinit/config/tests/test_ssh.py68
2 files changed, 97 insertions, 4 deletions
diff --git a/cloudinit/config/tests/test_mounts.py b/cloudinit/config/tests/test_mounts.py
index 764a33e3..56510fd6 100644
--- a/cloudinit/config/tests/test_mounts.py
+++ b/cloudinit/config/tests/test_mounts.py
@@ -4,6 +4,7 @@ from unittest import mock
import pytest
from cloudinit.config.cc_mounts import create_swapfile
+from cloudinit.subp import ProcessExecutionError
M_PATH = 'cloudinit.config.cc_mounts.'
@@ -26,3 +27,35 @@ class TestCreateSwapfile:
create_swapfile(fname, '')
assert mock.call(['mkswap', fname]) in m_subp.call_args_list
+
+ @mock.patch(M_PATH + "util.get_mount_info")
+ @mock.patch(M_PATH + "subp.subp")
+ def test_fallback_from_fallocate_to_dd(
+ self, m_subp, m_get_mount_info, caplog, tmpdir
+ ):
+ swap_file = tmpdir.join("swap-file")
+ fname = str(swap_file)
+
+ def subp_side_effect(cmd, *args, **kwargs):
+ # Mock fallocate failing, to initiate fallback
+ if cmd[0] == "fallocate":
+ raise ProcessExecutionError()
+
+ m_subp.side_effect = subp_side_effect
+ # Use ext4 so both fallocate and dd are valid swap creation methods
+ m_get_mount_info.return_value = (mock.ANY, "ext4")
+
+ create_swapfile(fname, "")
+
+ cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list]
+ assert "fallocate" in cmds, "fallocate was not called"
+ assert "dd" in cmds, "fallocate failure did not fallback to dd"
+
+ assert cmds.index("dd") > cmds.index(
+ "fallocate"
+ ), "dd ran before fallocate"
+
+ assert mock.call(["mkswap", fname]) in m_subp.call_args_list
+
+ msg = "fallocate swap creation failed, will attempt with dd"
+ assert msg in caplog.text
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
index 0c554414..87ccdb60 100644
--- a/cloudinit/config/tests/test_ssh.py
+++ b/cloudinit/config/tests/test_ssh.py
@@ -10,6 +10,8 @@ import logging
LOG = logging.getLogger(__name__)
MODPATH = "cloudinit.config.cc_ssh."
+KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES
+ if name not in 'dsa']
@mock.patch(MODPATH + "ssh_util.setup_user_keys")
@@ -25,7 +27,7 @@ class TestHandleSsh(CiTestCase):
}
self.test_hostkey_files = []
hostkey_tmpdir = self.tmp_dir()
- for key_type in ['dsa', 'ecdsa', 'ed25519', 'rsa']:
+ for key_type in cc_ssh.GENERATE_KEY_NAMES:
key_data = self.test_hostkeys[key_type]
filename = 'ssh_host_%s_key.pub' % key_type
filepath = os.path.join(hostkey_tmpdir, filename)
@@ -223,7 +225,7 @@ class TestHandleSsh(CiTestCase):
cfg = {}
expected_call = [self.test_hostkeys[key_type] for key_type
- in ['ecdsa', 'ed25519', 'rsa']]
+ in KEY_NAMES_NO_DSA]
cc_ssh.handle("name", cfg, cloud, LOG, None)
self.assertEqual([mock.call(expected_call)],
cloud.datasource.publish_host_keys.call_args_list)
@@ -252,7 +254,7 @@ class TestHandleSsh(CiTestCase):
cfg = {'ssh_publish_hostkeys': {'enabled': True}}
expected_call = [self.test_hostkeys[key_type] for key_type
- in ['ecdsa', 'ed25519', 'rsa']]
+ in KEY_NAMES_NO_DSA]
cc_ssh.handle("name", cfg, cloud, LOG, None)
self.assertEqual([mock.call(expected_call)],
cloud.datasource.publish_host_keys.call_args_list)
@@ -339,7 +341,65 @@ class TestHandleSsh(CiTestCase):
cfg = {'ssh_publish_hostkeys': {'enabled': True,
'blacklist': []}}
expected_call = [self.test_hostkeys[key_type] for key_type
- in ['dsa', 'ecdsa', 'ed25519', 'rsa']]
+ in cc_ssh.GENERATE_KEY_NAMES]
cc_ssh.handle("name", cfg, cloud, LOG, None)
self.assertEqual([mock.call(expected_call)],
cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "util.write_file")
+ def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys):
+ """Test handle with ssh keys and certificate."""
+ # Populate a config dictionary to pass to handle() as well
+ # as the expected file-writing calls.
+ cfg = {"ssh_keys": {}}
+
+ expected_calls = []
+ for key_type in cc_ssh.GENERATE_KEY_NAMES:
+ private_name = "{}_private".format(key_type)
+ public_name = "{}_public".format(key_type)
+ cert_name = "{}_certificate".format(key_type)
+
+ # Actual key contents don"t have to be realistic
+ private_value = "{}_PRIVATE_KEY".format(key_type)
+ public_value = "{}_PUBLIC_KEY".format(key_type)
+ cert_value = "{}_CERT_KEY".format(key_type)
+
+ cfg["ssh_keys"][private_name] = private_value
+ cfg["ssh_keys"][public_name] = public_value
+ cfg["ssh_keys"][cert_name] = cert_value
+
+ expected_calls.extend([
+ mock.call(
+ '/etc/ssh/ssh_host_{}_key'.format(key_type),
+ private_value,
+ 384
+ ),
+ mock.call(
+ '/etc/ssh/ssh_host_{}_key.pub'.format(key_type),
+ public_value,
+ 384
+ ),
+ mock.call(
+ '/etc/ssh/ssh_host_{}_key-cert.pub'.format(key_type),
+ cert_value,
+ 384
+ ),
+ mock.call(
+ '/etc/ssh/sshd_config',
+ ('HostCertificate /etc/ssh/ssh_host_{}_key-cert.pub'
+ '\n'.format(key_type)),
+ preserve_mode=True
+ )
+ ])
+
+ # Run the handler.
+ m_nug.return_value = ([], {})
+ with mock.patch(MODPATH + 'ssh_util.parse_ssh_config',
+ return_value=[]):
+ cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'),
+ LOG, None)
+
+ # Check that all expected output has been done.
+ for call_ in expected_calls:
+ self.assertIn(call_, m_write_file.call_args_list)