summaryrefslogtreecommitdiff
path: root/cloudinit/config
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /cloudinit/config
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'cloudinit/config')
-rw-r--r--cloudinit/config/tests/test_apt_pipelining.py28
-rw-r--r--cloudinit/config/tests/test_disable_ec2_metadata.py48
-rw-r--r--cloudinit/config/tests/test_final_message.py46
-rw-r--r--cloudinit/config/tests/test_grub_dpkg.py176
-rw-r--r--cloudinit/config/tests/test_keys_to_console.py34
-rw-r--r--cloudinit/config/tests/test_mounts.py61
-rw-r--r--cloudinit/config/tests/test_resolv_conf.py92
-rw-r--r--cloudinit/config/tests/test_set_passwords.py168
-rw-r--r--cloudinit/config/tests/test_snap.py564
-rw-r--r--cloudinit/config/tests/test_ssh.py405
-rw-r--r--cloudinit/config/tests/test_ubuntu_advantage.py333
-rw-r--r--cloudinit/config/tests/test_ubuntu_drivers.py244
-rw-r--r--cloudinit/config/tests/test_users_groups.py172
13 files changed, 0 insertions, 2371 deletions
diff --git a/cloudinit/config/tests/test_apt_pipelining.py b/cloudinit/config/tests/test_apt_pipelining.py
deleted file mode 100644
index 2a6bb10b..00000000
--- a/cloudinit/config/tests/test_apt_pipelining.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests cc_apt_pipelining handler"""
-
-import cloudinit.config.cc_apt_pipelining as cc_apt_pipelining
-
-from cloudinit.tests.helpers import CiTestCase, mock
-
-
-class TestAptPipelining(CiTestCase):
-
- @mock.patch('cloudinit.config.cc_apt_pipelining.util.write_file')
- def test_not_disabled_by_default(self, m_write_file):
- """ensure that default behaviour is to not disable pipelining"""
- cc_apt_pipelining.handle('foo', {}, None, mock.MagicMock(), None)
- self.assertEqual(0, m_write_file.call_count)
-
- @mock.patch('cloudinit.config.cc_apt_pipelining.util.write_file')
- def test_false_disables_pipelining(self, m_write_file):
- """ensure that pipelining can be disabled with correct config"""
- cc_apt_pipelining.handle(
- 'foo', {'apt_pipelining': 'false'}, None, mock.MagicMock(), None)
- self.assertEqual(1, m_write_file.call_count)
- args, _ = m_write_file.call_args
- self.assertEqual(cc_apt_pipelining.DEFAULT_FILE, args[0])
- self.assertIn('Pipeline-Depth "0"', args[1])
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_disable_ec2_metadata.py b/cloudinit/config/tests/test_disable_ec2_metadata.py
deleted file mode 100644
index b00f2083..00000000
--- a/cloudinit/config/tests/test_disable_ec2_metadata.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests cc_disable_ec2_metadata handler"""
-
-import cloudinit.config.cc_disable_ec2_metadata as ec2_meta
-
-from cloudinit.tests.helpers import CiTestCase, mock
-
-import logging
-
-LOG = logging.getLogger(__name__)
-
-DISABLE_CFG = {'disable_ec2_metadata': 'true'}
-
-
-class TestEC2MetadataRoute(CiTestCase):
-
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.which')
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.subp')
- def test_disable_ifconfig(self, m_subp, m_which):
- """Set the route if ifconfig command is available"""
- m_which.side_effect = lambda x: x if x == 'ifconfig' else None
- ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None)
- m_subp.assert_called_with(
- ['route', 'add', '-host', '169.254.169.254', 'reject'],
- capture=False)
-
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.which')
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.subp')
- def test_disable_ip(self, m_subp, m_which):
- """Set the route if ip command is available"""
- m_which.side_effect = lambda x: x if x == 'ip' else None
- ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None)
- m_subp.assert_called_with(
- ['ip', 'route', 'add', 'prohibit', '169.254.169.254'],
- capture=False)
-
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.which')
- @mock.patch('cloudinit.config.cc_disable_ec2_metadata.subp.subp')
- def test_disable_no_tool(self, m_subp, m_which):
- """Log error when neither route nor ip commands are available"""
- m_which.return_value = None # Find neither ifconfig nor ip
- ec2_meta.handle('foo', DISABLE_CFG, None, LOG, None)
- self.assertEqual(
- [mock.call('ip'), mock.call('ifconfig')], m_which.call_args_list)
- m_subp.assert_not_called()
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_final_message.py b/cloudinit/config/tests/test_final_message.py
deleted file mode 100644
index 46ba99b2..00000000
--- a/cloudinit/config/tests/test_final_message.py
+++ /dev/null
@@ -1,46 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-import logging
-from unittest import mock
-
-import pytest
-
-from cloudinit.config.cc_final_message import handle
-
-
-class TestHandle:
- # TODO: Expand these tests to cover full functionality; currently they only
- # cover the logic around how the boot-finished file is written (and not its
- # contents).
-
- @pytest.mark.parametrize(
- "instance_dir_exists,file_is_written,expected_log_substring",
- [
- (True, True, None),
- (False, False, "Failed to write boot finished file "),
- ],
- )
- def test_boot_finished_written(
- self,
- instance_dir_exists,
- file_is_written,
- expected_log_substring,
- caplog,
- tmpdir,
- ):
- instance_dir = tmpdir.join("var/lib/cloud/instance")
- if instance_dir_exists:
- instance_dir.ensure_dir()
- boot_finished = instance_dir.join("boot-finished")
-
- m_cloud = mock.Mock(
- paths=mock.Mock(boot_finished=boot_finished.strpath)
- )
-
- handle(None, {}, m_cloud, logging.getLogger(), [])
-
- # We should not change the status of the instance directory
- assert instance_dir_exists == instance_dir.exists()
- assert file_is_written == boot_finished.exists()
-
- if expected_log_substring:
- assert expected_log_substring in caplog.text
diff --git a/cloudinit/config/tests/test_grub_dpkg.py b/cloudinit/config/tests/test_grub_dpkg.py
deleted file mode 100644
index 99c05bb5..00000000
--- a/cloudinit/config/tests/test_grub_dpkg.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import pytest
-
-from unittest import mock
-from logging import Logger
-from cloudinit.subp import ProcessExecutionError
-from cloudinit.config.cc_grub_dpkg import fetch_idevs, handle
-
-
-class TestFetchIdevs:
- """Tests cc_grub_dpkg.fetch_idevs()"""
-
- # Note: udevadm info returns devices in a large single line string
- @pytest.mark.parametrize(
- "grub_output,path_exists,expected_log_call,udevadm_output"
- ",expected_idevs",
- [
- # Inside a container, grub not installed
- (
- ProcessExecutionError(reason=FileNotFoundError()),
- False,
- mock.call("'grub-probe' not found in $PATH"),
- '',
- '',
- ),
- # Inside a container, grub installed
- (
- ProcessExecutionError(stderr="failed to get canonical path"),
- False,
- mock.call("grub-probe 'failed to get canonical path'"),
- '',
- '',
- ),
- # KVM Instance
- (
- ['/dev/vda'],
- True,
- None,
- (
- '/dev/disk/by-path/pci-0000:00:00.0 ',
- '/dev/disk/by-path/virtio-pci-0000:00:00.0 '
- ),
- '/dev/vda',
- ),
- # Xen Instance
- (
- ['/dev/xvda'],
- True,
- None,
- '',
- '/dev/xvda',
- ),
- # NVMe Hardware Instance
- (
- ['/dev/nvme1n1'],
- True,
- None,
- (
- '/dev/disk/by-id/nvme-Company_hash000 ',
- '/dev/disk/by-id/nvme-nvme.000-000-000-000-000 ',
- '/dev/disk/by-path/pci-0000:00:00.0-nvme-0 '
- ),
- '/dev/disk/by-id/nvme-Company_hash000',
- ),
- # SCSI Hardware Instance
- (
- ['/dev/sda'],
- True,
- None,
- (
- '/dev/disk/by-id/company-user-1 ',
- '/dev/disk/by-id/scsi-0Company_user-1 ',
- '/dev/disk/by-path/pci-0000:00:00.0-scsi-0:0:0:0 '
- ),
- '/dev/disk/by-id/company-user-1',
- ),
- ],
- )
- @mock.patch("cloudinit.config.cc_grub_dpkg.util.logexc")
- @mock.patch("cloudinit.config.cc_grub_dpkg.os.path.exists")
- @mock.patch("cloudinit.config.cc_grub_dpkg.subp.subp")
- def test_fetch_idevs(self, m_subp, m_exists, m_logexc, grub_output,
- path_exists, expected_log_call, udevadm_output,
- expected_idevs):
- """Tests outputs from grub-probe and udevadm info against grub-dpkg"""
- m_subp.side_effect = [
- grub_output,
- ["".join(udevadm_output)]
- ]
- m_exists.return_value = path_exists
- log = mock.Mock(spec=Logger)
- idevs = fetch_idevs(log)
- assert expected_idevs == idevs
- if expected_log_call is not None:
- assert expected_log_call in log.debug.call_args_list
-
-
-class TestHandle:
- """Tests cc_grub_dpkg.handle()"""
-
- @pytest.mark.parametrize(
- "cfg_idevs,cfg_idevs_empty,fetch_idevs_output,expected_log_output",
- [
- (
- # No configuration
- None,
- None,
- '/dev/disk/by-id/nvme-Company_hash000',
- (
- "Setting grub debconf-set-selections with ",
- "'/dev/disk/by-id/nvme-Company_hash000','false'"
- ),
- ),
- (
- # idevs set, idevs_empty unset
- '/dev/sda',
- None,
- '/dev/sda',
- (
- "Setting grub debconf-set-selections with ",
- "'/dev/sda','false'"
- ),
- ),
- (
- # idevs unset, idevs_empty set
- None,
- 'true',
- '/dev/xvda',
- (
- "Setting grub debconf-set-selections with ",
- "'/dev/xvda','true'"
- ),
- ),
- (
- # idevs set, idevs_empty set
- '/dev/vda',
- 'false',
- '/dev/disk/by-id/company-user-1',
- (
- "Setting grub debconf-set-selections with ",
- "'/dev/vda','false'"
- ),
- ),
- (
- # idevs set, idevs_empty set
- # Respect what the user defines, even if its logically wrong
- '/dev/nvme0n1',
- 'true',
- '',
- (
- "Setting grub debconf-set-selections with ",
- "'/dev/nvme0n1','true'"
- ),
- )
- ],
- )
- @mock.patch("cloudinit.config.cc_grub_dpkg.fetch_idevs")
- @mock.patch("cloudinit.config.cc_grub_dpkg.util.get_cfg_option_str")
- @mock.patch("cloudinit.config.cc_grub_dpkg.util.logexc")
- @mock.patch("cloudinit.config.cc_grub_dpkg.subp.subp")
- def test_handle(self, m_subp, m_logexc, m_get_cfg_str, m_fetch_idevs,
- cfg_idevs, cfg_idevs_empty, fetch_idevs_output,
- expected_log_output):
- """Test setting of correct debconf database entries"""
- m_get_cfg_str.side_effect = [
- cfg_idevs,
- cfg_idevs_empty
- ]
- m_fetch_idevs.return_value = fetch_idevs_output
- log = mock.Mock(spec=Logger)
- handle(mock.Mock(), mock.Mock(), mock.Mock(), log, mock.Mock())
- log.debug.assert_called_with("".join(expected_log_output))
-
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_keys_to_console.py b/cloudinit/config/tests/test_keys_to_console.py
deleted file mode 100644
index 4083fc54..00000000
--- a/cloudinit/config/tests/test_keys_to_console.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""Tests for cc_keys_to_console."""
-from unittest import mock
-
-import pytest
-
-from cloudinit.config import cc_keys_to_console
-
-
-class TestHandle:
- """Tests for cloudinit.config.cc_keys_to_console.handle.
-
- TODO: These tests only cover the emit_keys_to_console config option, they
- should be expanded to cover the full functionality.
- """
-
- @mock.patch("cloudinit.config.cc_keys_to_console.util.multi_log")
- @mock.patch("cloudinit.config.cc_keys_to_console.os.path.exists")
- @mock.patch("cloudinit.config.cc_keys_to_console.subp.subp")
- @pytest.mark.parametrize("cfg,subp_called", [
- ({}, True), # Default to emitting keys
- ({"ssh": {}}, True), # Default even if we have the parent key
- ({"ssh": {"emit_keys_to_console": True}}, True), # Explicitly enabled
- ({"ssh": {"emit_keys_to_console": False}}, False), # Disabled
- ])
- def test_emit_keys_to_console_config(
- self, m_subp, m_path_exists, _m_multi_log, cfg, subp_called
- ):
- # Ensure we always find the helper
- m_path_exists.return_value = True
- m_subp.return_value = ("", "")
-
- cc_keys_to_console.handle("name", cfg, mock.Mock(), mock.Mock(), ())
-
- assert subp_called == (m_subp.call_count == 1)
diff --git a/cloudinit/config/tests/test_mounts.py b/cloudinit/config/tests/test_mounts.py
deleted file mode 100644
index 56510fd6..00000000
--- a/cloudinit/config/tests/test_mounts.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-from unittest import mock
-
-import pytest
-
-from cloudinit.config.cc_mounts import create_swapfile
-from cloudinit.subp import ProcessExecutionError
-
-
-M_PATH = 'cloudinit.config.cc_mounts.'
-
-
-class TestCreateSwapfile:
-
- @pytest.mark.parametrize('fstype', ('xfs', 'btrfs', 'ext4', 'other'))
- @mock.patch(M_PATH + 'util.get_mount_info')
- @mock.patch(M_PATH + 'subp.subp')
- def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir):
- swap_file = tmpdir.join("swap-file")
- fname = str(swap_file)
-
- # Some of the calls to subp.subp should create the swap file; this
- # roughly approximates that
- m_subp.side_effect = lambda *args, **kwargs: swap_file.write('')
-
- m_get_mount_info.return_value = (mock.ANY, fstype)
-
- create_swapfile(fname, '')
- assert mock.call(['mkswap', fname]) in m_subp.call_args_list
-
- @mock.patch(M_PATH + "util.get_mount_info")
- @mock.patch(M_PATH + "subp.subp")
- def test_fallback_from_fallocate_to_dd(
- self, m_subp, m_get_mount_info, caplog, tmpdir
- ):
- swap_file = tmpdir.join("swap-file")
- fname = str(swap_file)
-
- def subp_side_effect(cmd, *args, **kwargs):
- # Mock fallocate failing, to initiate fallback
- if cmd[0] == "fallocate":
- raise ProcessExecutionError()
-
- m_subp.side_effect = subp_side_effect
- # Use ext4 so both fallocate and dd are valid swap creation methods
- m_get_mount_info.return_value = (mock.ANY, "ext4")
-
- create_swapfile(fname, "")
-
- cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list]
- assert "fallocate" in cmds, "fallocate was not called"
- assert "dd" in cmds, "fallocate failure did not fallback to dd"
-
- assert cmds.index("dd") > cmds.index(
- "fallocate"
- ), "dd ran before fallocate"
-
- assert mock.call(["mkswap", fname]) in m_subp.call_args_list
-
- msg = "fallocate swap creation failed, will attempt with dd"
- assert msg in caplog.text
diff --git a/cloudinit/config/tests/test_resolv_conf.py b/cloudinit/config/tests/test_resolv_conf.py
deleted file mode 100644
index aff110e5..00000000
--- a/cloudinit/config/tests/test_resolv_conf.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import pytest
-
-from unittest import mock
-from cloudinit.config.cc_resolv_conf import generate_resolv_conf
-from tests.unittests.util import TestingDistro
-
-EXPECTED_HEADER = """\
-# Your system has been configured with 'manage-resolv-conf' set to true.
-# As a result, cloud-init has written this file with configuration data
-# that it has been provided. Cloud-init, by default, will write this file
-# a single time (PER_ONCE).
-#\n\n"""
-
-
-class TestGenerateResolvConf:
-
- dist = TestingDistro()
- tmpl_fn = "templates/resolv.conf.tmpl"
-
- @mock.patch("cloudinit.config.cc_resolv_conf.templater.render_to_file")
- def test_dist_resolv_conf_fn(self, m_render_to_file):
- self.dist.resolve_conf_fn = "/tmp/resolv-test.conf"
- generate_resolv_conf(self.tmpl_fn,
- mock.MagicMock(),
- self.dist.resolve_conf_fn)
-
- assert [
- mock.call(mock.ANY, self.dist.resolve_conf_fn, mock.ANY)
- ] == m_render_to_file.call_args_list
-
- @mock.patch("cloudinit.config.cc_resolv_conf.templater.render_to_file")
- def test_target_fname_is_used_if_passed(self, m_render_to_file):
- path = "/use/this/path"
- generate_resolv_conf(self.tmpl_fn, mock.MagicMock(), path)
-
- assert [
- mock.call(mock.ANY, path, mock.ANY)
- ] == m_render_to_file.call_args_list
-
- # Patch in templater so we can assert on the actual generated content
- @mock.patch("cloudinit.templater.util.write_file")
- # Parameterise with the value to be passed to generate_resolv_conf as the
- # params parameter, and the expected line after the header as
- # expected_extra_line.
- @pytest.mark.parametrize(
- "params,expected_extra_line",
- [
- # No options
- ({}, None),
- # Just a true flag
- ({"options": {"foo": True}}, "options foo"),
- # Just a false flag
- ({"options": {"foo": False}}, None),
- # Just an option
- ({"options": {"foo": "some_value"}}, "options foo:some_value"),
- # A true flag and an option
- (
- {"options": {"foo": "some_value", "bar": True}},
- "options bar foo:some_value",
- ),
- # Two options
- (
- {"options": {"foo": "some_value", "bar": "other_value"}},
- "options bar:other_value foo:some_value",
- ),
- # Everything
- (
- {
- "options": {
- "foo": "some_value",
- "bar": "other_value",
- "baz": False,
- "spam": True,
- }
- },
- "options spam bar:other_value foo:some_value",
- ),
- ],
- )
- def test_flags_and_options(
- self, m_write_file, params, expected_extra_line
- ):
- target_fn = "/etc/resolv.conf"
- generate_resolv_conf(self.tmpl_fn, params, target_fn)
-
- expected_content = EXPECTED_HEADER
- if expected_extra_line is not None:
- # If we have any extra lines, expect a trailing newline
- expected_content += "\n".join([expected_extra_line, ""])
- assert [
- mock.call(mock.ANY, expected_content, mode=mock.ANY)
- ] == m_write_file.call_args_list
diff --git a/cloudinit/config/tests/test_set_passwords.py b/cloudinit/config/tests/test_set_passwords.py
deleted file mode 100644
index 2a27f72f..00000000
--- a/cloudinit/config/tests/test_set_passwords.py
+++ /dev/null
@@ -1,168 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from unittest import mock
-
-from cloudinit.config import cc_set_passwords as setpass
-from cloudinit.tests.helpers import CiTestCase
-from cloudinit import util
-
-MODPATH = "cloudinit.config.cc_set_passwords."
-
-
-class TestHandleSshPwauth(CiTestCase):
- """Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth."""
-
- with_logs = True
-
- @mock.patch("cloudinit.distros.subp.subp")
- def test_unknown_value_logs_warning(self, m_subp):
- cloud = self.tmp_cloud(distro='ubuntu')
- setpass.handle_ssh_pwauth("floo", cloud.distro)
- self.assertIn("Unrecognized value: ssh_pwauth=floo",
- self.logs.getvalue())
- m_subp.assert_not_called()
-
- @mock.patch(MODPATH + "update_ssh_config", return_value=True)
- @mock.patch("cloudinit.distros.subp.subp")
- def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config):
- """If systemctl in service cmd: systemctl restart name."""
- cloud = self.tmp_cloud(distro='ubuntu')
- cloud.distro.init_cmd = ['systemctl']
- setpass.handle_ssh_pwauth(True, cloud.distro)
- m_subp.assert_called_with(
- ["systemctl", "restart", "ssh"], capture=True)
-
- @mock.patch(MODPATH + "update_ssh_config", return_value=False)
- @mock.patch("cloudinit.distros.subp.subp")
- def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config):
- """If config is not updated, then no system restart should be done."""
- cloud = self.tmp_cloud(distro='ubuntu')
- setpass.handle_ssh_pwauth(True, cloud.distro)
- m_subp.assert_not_called()
- self.assertIn("No need to restart SSH", self.logs.getvalue())
-
- @mock.patch(MODPATH + "update_ssh_config", return_value=True)
- @mock.patch("cloudinit.distros.subp.subp")
- def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config):
- """If 'unchanged', then no updates to config and no restart."""
- cloud = self.tmp_cloud(distro='ubuntu')
- setpass.handle_ssh_pwauth("unchanged", cloud.distro)
- m_update_ssh_config.assert_not_called()
- m_subp.assert_not_called()
-
- @mock.patch("cloudinit.distros.subp.subp")
- def test_valid_change_values(self, m_subp):
- """If value is a valid changen value, then update should be called."""
- cloud = self.tmp_cloud(distro='ubuntu')
- upname = MODPATH + "update_ssh_config"
- optname = "PasswordAuthentication"
- for value in util.FALSE_STRINGS + util.TRUE_STRINGS:
- optval = "yes" if value in util.TRUE_STRINGS else "no"
- with mock.patch(upname, return_value=False) as m_update:
- setpass.handle_ssh_pwauth(value, cloud.distro)
- m_update.assert_called_with({optname: optval})
- m_subp.assert_not_called()
-
-
-class TestSetPasswordsHandle(CiTestCase):
- """Test cc_set_passwords.handle"""
-
- with_logs = True
-
- def test_handle_on_empty_config(self, *args):
- """handle logs that no password has changed when config is empty."""
- cloud = self.tmp_cloud(distro='ubuntu')
- setpass.handle(
- 'IGNORED', cfg={}, cloud=cloud, log=self.logger, args=[])
- self.assertEqual(
- "DEBUG: Leaving SSH config 'PasswordAuthentication' unchanged. "
- 'ssh_pwauth=None\n',
- self.logs.getvalue())
-
- @mock.patch(MODPATH + "subp.subp")
- def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
- """handle parses command password hashes."""
- cloud = self.tmp_cloud(distro='ubuntu')
- valid_hashed_pwds = [
- 'root:$2y$10$8BQjxjVByHA/Ee.O1bCXtO8S7Y5WojbXWqnqYpUW.BrPx/'
- 'Dlew1Va',
- 'ubuntu:$6$5hOurLPO$naywm3Ce0UlmZg9gG2Fl9acWCVEoakMMC7dR52q'
- 'SDexZbrN9z8yHxhUM2b.sxpguSwOlbOQSW/HpXazGGx3oo1']
- cfg = {'chpasswd': {'list': valid_hashed_pwds}}
- with mock.patch(MODPATH + 'subp.subp') as m_subp:
- setpass.handle(
- 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
- self.assertIn(
- 'DEBUG: Handling input for chpasswd as list.',
- self.logs.getvalue())
- self.assertIn(
- "DEBUG: Setting hashed password for ['root', 'ubuntu']",
- self.logs.getvalue())
- self.assertEqual(
- [mock.call(['chpasswd', '-e'],
- '\n'.join(valid_hashed_pwds) + '\n')],
- m_subp.call_args_list)
-
- @mock.patch(MODPATH + "util.is_BSD")
- @mock.patch(MODPATH + "subp.subp")
- def test_bsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
- self, m_subp, m_is_bsd):
- """BSD don't use chpasswd"""
- m_is_bsd.return_value = True
- cloud = self.tmp_cloud(distro='freebsd')
- valid_pwds = ['ubuntu:passw0rd']
- cfg = {'chpasswd': {'list': valid_pwds}}
- setpass.handle(
- 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
- self.assertEqual([
- mock.call(['pw', 'usermod', 'ubuntu', '-h', '0'], data='passw0rd',
- logstring="chpasswd for ubuntu"),
- mock.call(['pw', 'usermod', 'ubuntu', '-p', '01-Jan-1970'])],
- m_subp.call_args_list)
-
- @mock.patch(MODPATH + "util.multi_log")
- @mock.patch(MODPATH + "subp.subp")
- def test_handle_on_chpasswd_list_creates_random_passwords(
- self, m_subp, m_multi_log
- ):
- """handle parses command set random passwords."""
- cloud = self.tmp_cloud(distro='ubuntu')
- valid_random_pwds = [
- 'root:R',
- 'ubuntu:RANDOM']
- cfg = {'chpasswd': {'expire': 'false', 'list': valid_random_pwds}}
- with mock.patch(MODPATH + 'subp.subp') as m_subp:
- setpass.handle(
- 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
- self.assertIn(
- 'DEBUG: Handling input for chpasswd as list.',
- self.logs.getvalue())
-
- self.assertEqual(1, m_subp.call_count)
- args, _kwargs = m_subp.call_args
- self.assertEqual(["chpasswd"], args[0])
-
- stdin = args[1]
- user_pass = {
- user: password
- for user, password
- in (line.split(":") for line in stdin.splitlines())
- }
-
- self.assertEqual(1, m_multi_log.call_count)
- self.assertEqual(
- mock.call(mock.ANY, stderr=False, fallback_to_stdout=False),
- m_multi_log.call_args
- )
-
- self.assertEqual(set(["root", "ubuntu"]), set(user_pass.keys()))
- written_lines = m_multi_log.call_args[0][0].splitlines()
- for password in user_pass.values():
- for line in written_lines:
- if password in line:
- break
- else:
- self.fail("Password not emitted to console")
-
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py
deleted file mode 100644
index 6d4c014a..00000000
--- a/cloudinit/config/tests/test_snap.py
+++ /dev/null
@@ -1,564 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import re
-from io import StringIO
-
-from cloudinit.config.cc_snap import (
- ASSERTIONS_FILE, add_assertions, handle, maybe_install_squashfuse,
- run_commands, schema)
-from cloudinit.config.schema import validate_cloudconfig_schema
-from cloudinit import util
-from cloudinit.tests.helpers import (
- CiTestCase, SchemaTestCaseMixin, mock, wrap_and_call, skipUnlessJsonSchema)
-
-
-SYSTEM_USER_ASSERTION = """\
-type: system-user
-authority-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
-brand-id: LqvZQdfyfGlYvtep4W6Oj6pFXP9t1Ksp
-email: foo@bar.com
-password: $6$E5YiAuMIPAwX58jG$miomhVNui/vf7f/3ctB/f0RWSKFxG0YXzrJ9rtJ1ikvzt
-series:
-- 16
-since: 2016-09-10T16:34:00+03:00
-until: 2017-11-10T16:34:00+03:00
-username: baz
-sign-key-sha3-384: RuVvnp4n52GilycjfbbTCI3_L8Y6QlIE75wxMc0KzGV3AUQqVd9GuXoj
-
-AcLBXAQAAQoABgUCV/UU1wAKCRBKnlMoJQLkZVeLD/9/+hIeVywtzsDA3oxl+P+u9D13y9s6svP
-Jd6Wnf4FTw6sq1GjBE4ZA7lrwSaRCUJ9Vcsvf2q9OGPY7mOb2TBxaDe0PbUMjrSrqllSSQwhpNI
-zG+NxkkKuxsUmLzFa+k9m6cyojNbw5LFhQZBQCGlr3JYqC0tIREq/UsZxj+90TUC87lDJwkU8GF
-s4CR+rejZj4itIcDcVxCSnJH6hv6j2JrJskJmvObqTnoOlcab+JXdamXqbldSP3UIhWoyVjqzkj
-+to7mXgx+cCUA9+ngNCcfUG+1huGGTWXPCYkZ78HvErcRlIdeo4d3xwtz1cl/w3vYnq9og1XwsP
-Yfetr3boig2qs1Y+j/LpsfYBYncgWjeDfAB9ZZaqQz/oc8n87tIPZDJHrusTlBfop8CqcM4xsKS
-d+wnEY8e/F24mdSOYmS1vQCIDiRU3MKb6x138Ud6oHXFlRBbBJqMMctPqWDunWzb5QJ7YR0I39q
-BrnEqv5NE0G7w6HOJ1LSPG5Hae3P4T2ea+ATgkb03RPr3KnXnzXg4TtBbW1nytdlgoNc/BafE1H
-f3NThcq9gwX4xWZ2PAWnqVPYdDMyCtzW3Ck+o6sIzx+dh4gDLPHIi/6TPe/pUuMop9CBpWwez7V
-v1z+1+URx6Xlq3Jq18y5pZ6fY3IDJ6km2nQPMzcm4Q=="""
-
-ACCOUNT_ASSERTION = """\
-type: account-key
-authority-id: canonical
-revision: 2
-public-key-sha3-384: BWDEoaqyr25nF5SNCvEv2v7QnM9QsfCc0PBMYD_i2NGSQ32EF2d4D0
-account-id: canonical
-name: store
-since: 2016-04-01T00:00:00.0Z
-body-length: 717
-sign-key-sha3-384: -CvQKAwRQ5h3Ffn10FILJoEZUXOv6km9FwA80-Rcj-f-6jadQ89VRswH
-
-AcbBTQRWhcGAARAA0KKYYQWuHOrsFVi4p4l7ZzSvX7kLgJFFeFgOkzdWKBTHEnsMKjl5mefFe9j
-qe8NlmJdfY7BenP7XeBtwKp700H/t9lLrZbpTNAPHXYxEWFJp5bPqIcJYBZ+29oLVLN1Tc5X482
-vCiDqL8+pPYqBrK2fNlyPlNNSum9wI70rDDL4r6FVvr+osTnGejibdV8JphWX+lrSQDnRSdM8KJ
-UM43vTgLGTi9W54oRhsA2OFexRfRksTrnqGoonCjqX5wO3OFSaMDzMsO2MJ/hPfLgDqw53qjzuK
-Iec9OL3k5basvu2cj5u9tKwVFDsCKK2GbKUsWWpx2KTpOifmhmiAbzkTHbH9KaoMS7p0kJwhTQG
-o9aJ9VMTWHJc/NCBx7eu451u6d46sBPCXS/OMUh2766fQmoRtO1OwCTxsRKG2kkjbMn54UdFULl
-VfzvyghMNRKIezsEkmM8wueTqGUGZWa6CEZqZKwhe/PROxOPYzqtDH18XZknbU1n5lNb7vNfem9
-2ai+3+JyFnW9UhfvpVF7gzAgdyCqNli4C6BIN43uwoS8HkykocZS/+Gv52aUQ/NZ8BKOHLw+7an
-Q0o8W9ltSLZbEMxFIPSN0stiZlkXAp6DLyvh1Y4wXSynDjUondTpej2fSvSlCz/W5v5V7qA4nIc
-vUvV7RjVzv17ut0AEQEAAQ==
-
-AcLDXAQAAQoABgUCV83k9QAKCRDUpVvql9g3IBT8IACKZ7XpiBZ3W4lqbPssY6On81WmxQLtvsM
-WTp6zZpl/wWOSt2vMNUk9pvcmrNq1jG9CuhDfWFLGXEjcrrmVkN3YuCOajMSPFCGrxsIBLSRt/b
-nrKykdLAAzMfG8rP1d82bjFFiIieE+urQ0Kcv09Jtdvavq3JT1Tek5mFyyfhHNlQEKOzWqmRWiL
-3c3VOZUs1ZD8TSlnuq/x+5T0X0YtOyGjSlVxk7UybbyMNd6MZfNaMpIG4x+mxD3KHFtBAC7O6kL
-eX3i6j5nCY5UABfA3DZEAkWP4zlmdBEOvZ9t293NaDdOpzsUHRkoi0Zez/9BHQ/kwx/uNc2WqrY
-inCmu16JGNeXqsyinnLl7Ghn2RwhvDMlLxF6RTx8xdx1yk6p3PBTwhZMUvuZGjUtN/AG8BmVJQ1
-rsGSRkkSywvnhVJRB2sudnrMBmNS2goJbzSbmJnOlBrd2WsV0T9SgNMWZBiov3LvU4o2SmAb6b+
-rYwh8H5QHcuuYJuxDjFhPswIp6Wes5T6hUicf3SWtObcDS4HSkVS4ImBjjX9YgCuFy7QdnooOWE
-aPvkRw3XCVeYq0K6w9GRsk1YFErD4XmXXZjDYY650MX9v42Sz5MmphHV8jdIY5ssbadwFSe2rCQ
-6UX08zy7RsIb19hTndE6ncvSNDChUR9eEnCm73eYaWTWTnq1cxdVP/s52r8uss++OYOkPWqh5nO
-haRn7INjH/yZX4qXjNXlTjo0PnHH0q08vNKDwLhxS+D9du+70FeacXFyLIbcWllSbJ7DmbumGpF
-yYbtj3FDDPzachFQdIG3lSt+cSUGeyfSs6wVtc3cIPka/2Urx7RprfmoWSI6+a5NcLdj0u2z8O9
-HxeIgxDpg/3gT8ZIuFKePMcLDM19Fh/p0ysCsX+84B9chNWtsMSmIaE57V+959MVtsLu7SLb9gi
-skrju0pQCwsu2wHMLTNd1f3PTHmrr49hxetTus07HSQUApMtAGKzQilF5zqFjbyaTd4xgQbd+PK
-CjFyzQTDOcUhXpuUGt/IzlqiFfsCsmbj2K4KdSNYMlqIgZ3Azu8KvZLIhsyN7v5vNIZSPfEbjde
-ClU9r0VRiJmtYBUjcSghD9LWn+yRLwOxhfQVjm0cBwIt5R/yPF/qC76yIVuWUtM5Y2/zJR1J8OF
-qWchvlImHtvDzS9FQeLyzJAOjvZ2CnWp2gILgUz0WQdOk1Dq8ax7KS9BQ42zxw9EZAEPw3PEFqR
-IQsRTONp+iVS8YxSmoYZjDlCgRMWUmawez/Fv5b9Fb/XkO5Eq4e+KfrpUujXItaipb+tV8h5v3t
-oG3Ie3WOHrVjCLXIdYslpL1O4nadqR6Xv58pHj6k"""
-
-
-class FakeCloud(object):
- def __init__(self, distro):
- self.distro = distro
-
-
-class TestAddAssertions(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestAddAssertions, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- def test_add_assertions_on_empty_list(self, m_subp):
- """When provided with an empty list, add_assertions does nothing."""
- add_assertions([])
- self.assertEqual('', self.logs.getvalue())
- m_subp.assert_not_called()
-
- def test_add_assertions_on_non_list_or_dict(self):
- """When provided an invalid type, add_assertions raises an error."""
- with self.assertRaises(TypeError) as context_manager:
- add_assertions(assertions="I'm Not Valid")
- self.assertEqual(
- "assertion parameter was not a list or dict: I'm Not Valid",
- str(context_manager.exception))
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- def test_add_assertions_adds_assertions_as_list(self, m_subp):
- """When provided with a list, add_assertions adds all assertions."""
- self.assertEqual(
- ASSERTIONS_FILE, '/var/lib/cloud/instance/snapd.assertions')
- assert_file = self.tmp_path('snapd.assertions', dir=self.tmp)
- assertions = [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]
- wrap_and_call(
- 'cloudinit.config.cc_snap',
- {'ASSERTIONS_FILE': {'new': assert_file}},
- add_assertions, assertions)
- self.assertIn(
- 'Importing user-provided snap assertions', self.logs.getvalue())
- self.assertIn(
- 'sertions', self.logs.getvalue())
- self.assertEqual(
- [mock.call(['snap', 'ack', assert_file], capture=True)],
- m_subp.call_args_list)
- compare_file = self.tmp_path('comparison', dir=self.tmp)
- util.write_file(compare_file, '\n'.join(assertions).encode('utf-8'))
- self.assertEqual(
- util.load_file(compare_file), util.load_file(assert_file))
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- def test_add_assertions_adds_assertions_as_dict(self, m_subp):
- """When provided with a dict, add_assertions adds all assertions."""
- self.assertEqual(
- ASSERTIONS_FILE, '/var/lib/cloud/instance/snapd.assertions')
- assert_file = self.tmp_path('snapd.assertions', dir=self.tmp)
- assertions = {'00': SYSTEM_USER_ASSERTION, '01': ACCOUNT_ASSERTION}
- wrap_and_call(
- 'cloudinit.config.cc_snap',
- {'ASSERTIONS_FILE': {'new': assert_file}},
- add_assertions, assertions)
- self.assertIn(
- 'Importing user-provided snap assertions', self.logs.getvalue())
- self.assertIn(
- "DEBUG: Snap acking: ['type: system-user', 'authority-id: Lqv",
- self.logs.getvalue())
- self.assertIn(
- "DEBUG: Snap acking: ['type: account-key', 'authority-id: canonic",
- self.logs.getvalue())
- self.assertEqual(
- [mock.call(['snap', 'ack', assert_file], capture=True)],
- m_subp.call_args_list)
- compare_file = self.tmp_path('comparison', dir=self.tmp)
- combined = '\n'.join(assertions.values())
- util.write_file(compare_file, combined.encode('utf-8'))
- self.assertEqual(
- util.load_file(compare_file), util.load_file(assert_file))
-
-
-class TestRunCommands(CiTestCase):
-
- with_logs = True
- allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
-
- def setUp(self):
- super(TestRunCommands, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- def test_run_commands_on_empty_list(self, m_subp):
- """When provided with an empty list, run_commands does nothing."""
- run_commands([])
- self.assertEqual('', self.logs.getvalue())
- m_subp.assert_not_called()
-
- def test_run_commands_on_non_list_or_dict(self):
- """When provided an invalid type, run_commands raises an error."""
- with self.assertRaises(TypeError) as context_manager:
- run_commands(commands="I'm Not Valid")
- self.assertEqual(
- "commands parameter was not a list or dict: I'm Not Valid",
- str(context_manager.exception))
-
- def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
- """All exit codes are logged to stderr."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
-
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'bogus command'
- cmd3 = 'echo "MOM" >> %s' % outfile
- commands = [cmd1, cmd2, cmd3]
-
- mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
- with self.assertRaises(RuntimeError) as context_manager:
- run_commands(commands=commands)
-
- self.assertIsNotNone(
- re.search(r'bogus: (command )?not found',
- str(context_manager.exception)),
- msg='Expected bogus command not found')
- expected_stderr_log = '\n'.join([
- 'Begin run command: {cmd}'.format(cmd=cmd1),
- 'End run command: exit(0)',
- 'Begin run command: {cmd}'.format(cmd=cmd2),
- 'ERROR: End run command: exit(127)',
- 'Begin run command: {cmd}'.format(cmd=cmd3),
- 'End run command: exit(0)\n'])
- self.assertEqual(expected_stderr_log, m_stderr.getvalue())
-
- def test_run_command_as_lists(self):
- """When commands are specified as a list, run them in order."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
-
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'echo "MOM" >> %s' % outfile
- commands = [cmd1, cmd2]
- mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with mock.patch(mock_path, new_callable=StringIO):
- run_commands(commands=commands)
-
- self.assertIn(
- 'DEBUG: Running user-provided snap commands',
- self.logs.getvalue())
- self.assertEqual('HI\nMOM\n', util.load_file(outfile))
- self.assertIn(
- 'WARNING: Non-snap commands in snap config:', self.logs.getvalue())
-
- def test_run_command_dict_sorted_as_command_script(self):
- """When commands are a dict, sort them and run."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'echo "MOM" >> %s' % outfile
- commands = {'02': cmd1, '01': cmd2}
- mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with mock.patch(mock_path, new_callable=StringIO):
- run_commands(commands=commands)
-
- expected_messages = [
- 'DEBUG: Running user-provided snap commands']
- for message in expected_messages:
- self.assertIn(message, self.logs.getvalue())
- self.assertEqual('MOM\nHI\n', util.load_file(outfile))
-
-
-@skipUnlessJsonSchema()
-class TestSchema(CiTestCase, SchemaTestCaseMixin):
-
- with_logs = True
- schema = schema
-
- def test_schema_warns_on_snap_not_as_dict(self):
- """If the snap configuration is not a dict, emit a warning."""
- validate_cloudconfig_schema({'snap': 'wrong type'}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap: 'wrong type' is not of type"
- " 'object'\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_schema_disallows_unknown_keys(self, _):
- """Unknown keys in the snap configuration emit warnings."""
- validate_cloudconfig_schema(
- {'snap': {'commands': ['ls'], 'invalid-key': ''}}, schema)
- self.assertIn(
- 'WARNING: Invalid config:\nsnap: Additional properties are not'
- " allowed ('invalid-key' was unexpected)",
- self.logs.getvalue())
-
- def test_warn_schema_requires_either_commands_or_assertions(self):
- """Warn when snap configuration lacks both commands and assertions."""
- validate_cloudconfig_schema(
- {'snap': {}}, schema)
- self.assertIn(
- 'WARNING: Invalid config:\nsnap: {} does not have enough'
- ' properties',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_warn_schema_commands_is_not_list_or_dict(self, _):
- """Warn when snap:commands config is not a list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'commands': 'broken'}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap.commands: 'broken' is not of type"
- " 'object', 'array'\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_warn_schema_when_commands_is_empty(self, _):
- """Emit warnings when snap:commands is an empty list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'commands': []}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'commands': {}}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap.commands: [] is too short\n"
- "WARNING: Invalid config:\nsnap.commands: {} does not have enough"
- " properties\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_schema_when_commands_are_list_or_dict(self, _):
- """No warnings when snap:commands are either a list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'commands': ['valid']}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'commands': {'01': 'also valid'}}}, schema)
- self.assertEqual('', self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_schema_when_commands_values_are_invalid_type(self, _):
- """Warnings when snap:commands values are invalid type (e.g. int)"""
- validate_cloudconfig_schema(
- {'snap': {'commands': [123]}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'commands': {'01': 123}}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\n"
- "snap.commands.0: 123 is not valid under any of the given"
- " schemas\n"
- "WARNING: Invalid config:\n"
- "snap.commands.01: 123 is not valid under any of the given"
- " schemas\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_schema_when_commands_list_values_are_invalid_type(self, _):
- """Warnings when snap:commands list values are wrong type (e.g. int)"""
- validate_cloudconfig_schema(
- {'snap': {'commands': [["snap", "install", 123]]}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'commands': {'01': ["snap", "install", 123]}}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\n"
- "snap.commands.0: ['snap', 'install', 123] is not valid under any"
- " of the given schemas\n",
- "WARNING: Invalid config:\n"
- "snap.commands.0: ['snap', 'install', 123] is not valid under any"
- " of the given schemas\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- def test_schema_when_assertions_values_are_invalid_type(self, _):
- """Warnings when snap:assertions values are invalid type (e.g. int)"""
- validate_cloudconfig_schema(
- {'snap': {'assertions': [123]}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'assertions': {'01': 123}}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\n"
- "snap.assertions.0: 123 is not of type 'string'\n"
- "WARNING: Invalid config:\n"
- "snap.assertions.01: 123 is not of type 'string'\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.add_assertions')
- def test_warn_schema_assertions_is_not_list_or_dict(self, _):
- """Warn when snap:assertions config is not a list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'assertions': 'broken'}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap.assertions: 'broken' is not of"
- " type 'object', 'array'\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.add_assertions')
- def test_warn_schema_when_assertions_is_empty(self, _):
- """Emit warnings when snap:assertions is an empty list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'assertions': []}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'assertions': {}}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap.assertions: [] is too short\n"
- "WARNING: Invalid config:\nsnap.assertions: {} does not have"
- " enough properties\n",
- self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.add_assertions')
- def test_schema_when_assertions_are_list_or_dict(self, _):
- """No warnings when snap:assertions are a list or dict."""
- validate_cloudconfig_schema(
- {'snap': {'assertions': ['valid']}}, schema)
- validate_cloudconfig_schema(
- {'snap': {'assertions': {'01': 'also valid'}}}, schema)
- self.assertEqual('', self.logs.getvalue())
-
- def test_duplicates_are_fine_array_array(self):
- """Duplicated commands array/array entries are allowed."""
- self.assertSchemaValid(
- {'commands': [["echo", "bye"], ["echo", "bye"]]},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_array_string(self):
- """Duplicated commands array/string entries are allowed."""
- self.assertSchemaValid(
- {'commands': ["echo bye", "echo bye"]},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_dict_array(self):
- """Duplicated commands dict/array entries are allowed."""
- self.assertSchemaValid(
- {'commands': {'00': ["echo", "bye"], '01': ["echo", "bye"]}},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_dict_string(self):
- """Duplicated commands dict/string entries are allowed."""
- self.assertSchemaValid(
- {'commands': {'00': "echo bye", '01': "echo bye"}},
- "command entries can be duplicate.")
-
-
-class TestHandle(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestHandle, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- @mock.patch('cloudinit.config.cc_snap.add_assertions')
- @mock.patch('cloudinit.config.cc_snap.validate_cloudconfig_schema')
- def test_handle_no_config(self, m_schema, m_add, m_run):
- """When no snap-related configuration is provided, nothing happens."""
- cfg = {}
- handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertIn(
- "DEBUG: Skipping module named snap, no 'snap' key in config",
- self.logs.getvalue())
- m_schema.assert_not_called()
- m_add.assert_not_called()
- m_run.assert_not_called()
-
- @mock.patch('cloudinit.config.cc_snap.run_commands')
- @mock.patch('cloudinit.config.cc_snap.add_assertions')
- @mock.patch('cloudinit.config.cc_snap.maybe_install_squashfuse')
- def test_handle_skips_squashfuse_when_unconfigured(self, m_squash, m_add,
- m_run):
- """When squashfuse_in_container is unset, don't attempt to install."""
- handle(
- 'snap', cfg={'snap': {}}, cloud=None, log=self.logger, args=None)
- handle(
- 'snap', cfg={'snap': {'squashfuse_in_container': None}},
- cloud=None, log=self.logger, args=None)
- handle(
- 'snap', cfg={'snap': {'squashfuse_in_container': False}},
- cloud=None, log=self.logger, args=None)
- self.assertEqual([], m_squash.call_args_list) # No calls
- # snap configuration missing assertions and commands will default to []
- self.assertIn(mock.call([]), m_add.call_args_list)
- self.assertIn(mock.call([]), m_run.call_args_list)
-
- @mock.patch('cloudinit.config.cc_snap.maybe_install_squashfuse')
- def test_handle_tries_to_install_squashfuse(self, m_squash):
- """If squashfuse_in_container is True, try installing squashfuse."""
- cfg = {'snap': {'squashfuse_in_container': True}}
- mycloud = FakeCloud(None)
- handle('snap', cfg=cfg, cloud=mycloud, log=self.logger, args=None)
- self.assertEqual(
- [mock.call(mycloud)], m_squash.call_args_list)
-
- def test_handle_runs_commands_provided(self):
- """If commands are specified as a list, run them."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
-
- cfg = {
- 'snap': {'commands': ['echo "HI" >> %s' % outfile,
- 'echo "MOM" >> %s' % outfile]}}
- mock_path = 'cloudinit.config.cc_snap.sys.stderr'
- with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
- with mock.patch(mock_path, new_callable=StringIO):
- handle('snap', cfg=cfg, cloud=None, log=self.logger, args=None)
-
- self.assertEqual('HI\nMOM\n', util.load_file(outfile))
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- def test_handle_adds_assertions(self, m_subp):
- """Any configured snap assertions are provided to add_assertions."""
- assert_file = self.tmp_path('snapd.assertions', dir=self.tmp)
- compare_file = self.tmp_path('comparison', dir=self.tmp)
- cfg = {
- 'snap': {'assertions': [SYSTEM_USER_ASSERTION, ACCOUNT_ASSERTION]}}
- wrap_and_call(
- 'cloudinit.config.cc_snap',
- {'ASSERTIONS_FILE': {'new': assert_file}},
- handle, 'snap', cfg=cfg, cloud=None, log=self.logger, args=None)
- content = '\n'.join(cfg['snap']['assertions'])
- util.write_file(compare_file, content.encode('utf-8'))
- self.assertEqual(
- util.load_file(compare_file), util.load_file(assert_file))
-
- @mock.patch('cloudinit.config.cc_snap.subp.subp')
- @skipUnlessJsonSchema()
- def test_handle_validates_schema(self, m_subp):
- """Any provided configuration is runs validate_cloudconfig_schema."""
- assert_file = self.tmp_path('snapd.assertions', dir=self.tmp)
- cfg = {'snap': {'invalid': ''}} # Generates schema warning
- wrap_and_call(
- 'cloudinit.config.cc_snap',
- {'ASSERTIONS_FILE': {'new': assert_file}},
- handle, 'snap', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertEqual(
- "WARNING: Invalid config:\nsnap: Additional properties are not"
- " allowed ('invalid' was unexpected)\n",
- self.logs.getvalue())
-
-
-class TestMaybeInstallSquashFuse(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestMaybeInstallSquashFuse, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('cloudinit.config.cc_snap.util.is_container')
- def test_maybe_install_squashfuse_skips_non_containers(self, m_container):
- """maybe_install_squashfuse does nothing when not on a container."""
- m_container.return_value = False
- maybe_install_squashfuse(cloud=FakeCloud(None))
- self.assertEqual([mock.call()], m_container.call_args_list)
- self.assertEqual('', self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.util.is_container')
- def test_maybe_install_squashfuse_raises_install_errors(self, m_container):
- """maybe_install_squashfuse logs and raises package install errors."""
- m_container.return_value = True
- distro = mock.MagicMock()
- distro.update_package_sources.side_effect = RuntimeError(
- 'Some apt error')
- with self.assertRaises(RuntimeError) as context_manager:
- maybe_install_squashfuse(cloud=FakeCloud(distro))
- self.assertEqual('Some apt error', str(context_manager.exception))
- self.assertIn('Package update failed\nTraceback', self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.util.is_container')
- def test_maybe_install_squashfuse_raises_update_errors(self, m_container):
- """maybe_install_squashfuse logs and raises package update errors."""
- m_container.return_value = True
- distro = mock.MagicMock()
- distro.update_package_sources.side_effect = RuntimeError(
- 'Some apt error')
- with self.assertRaises(RuntimeError) as context_manager:
- maybe_install_squashfuse(cloud=FakeCloud(distro))
- self.assertEqual('Some apt error', str(context_manager.exception))
- self.assertIn('Package update failed\nTraceback', self.logs.getvalue())
-
- @mock.patch('cloudinit.config.cc_snap.util.is_container')
- def test_maybe_install_squashfuse_happy_path(self, m_container):
- """maybe_install_squashfuse logs and raises package install errors."""
- m_container.return_value = True
- distro = mock.MagicMock() # No errors raised
- maybe_install_squashfuse(cloud=FakeCloud(distro))
- self.assertEqual(
- [mock.call()], distro.update_package_sources.call_args_list)
- self.assertEqual(
- [mock.call(['squashfuse'])],
- distro.install_packages.call_args_list)
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
deleted file mode 100644
index 87ccdb60..00000000
--- a/cloudinit/config/tests/test_ssh.py
+++ /dev/null
@@ -1,405 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import os.path
-
-from cloudinit.config import cc_ssh
-from cloudinit import ssh_util
-from cloudinit.tests.helpers import CiTestCase, mock
-import logging
-
-LOG = logging.getLogger(__name__)
-
-MODPATH = "cloudinit.config.cc_ssh."
-KEY_NAMES_NO_DSA = [name for name in cc_ssh.GENERATE_KEY_NAMES
- if name not in 'dsa']
-
-
-@mock.patch(MODPATH + "ssh_util.setup_user_keys")
-class TestHandleSsh(CiTestCase):
- """Test cc_ssh handling of ssh config."""
-
- def _publish_hostkey_test_setup(self):
- self.test_hostkeys = {
- 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'),
- 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'),
- 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'),
- 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'),
- }
- self.test_hostkey_files = []
- hostkey_tmpdir = self.tmp_dir()
- for key_type in cc_ssh.GENERATE_KEY_NAMES:
- key_data = self.test_hostkeys[key_type]
- filename = 'ssh_host_%s_key.pub' % key_type
- filepath = os.path.join(hostkey_tmpdir, filename)
- self.test_hostkey_files.append(filepath)
- with open(filepath, 'w') as f:
- f.write(' '.join(key_data))
-
- cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key')
-
- def test_apply_credentials_with_user(self, m_setup_keys):
- """Apply keys for the given user and root."""
- keys = ["key1"]
- user = "clouduser"
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user(self, m_setup_keys):
- """Apply keys for root only."""
- keys = ["key1"]
- user = None
- cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS)
- self.assertEqual([mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_user_disable_root(self, m_setup_keys):
- """Apply keys for the given user and disable root ssh."""
- keys = ["key1"]
- user = "clouduser"
- options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys):
- """Apply keys no user and disable root ssh."""
- keys = ["key1"]
- user = None
- options = ssh_util.DISABLE_USER_OPTS
- cc_ssh.apply_credentials(keys, user, True, options)
- options = options.replace("$USER", "NONE")
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_no_cfg(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with no config ignores generating existing keyfiles."""
- cfg = {}
- keys = ["key1"]
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ([], {})
- cc_ssh.PUBLISH_HOST_KEYS = False
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE")
- options = options.replace("$DISABLE_USER", "root")
- m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*')
- self.assertIn(
- [mock.call('/etc/ssh/ssh_host_rsa_key'),
- mock.call('/etc/ssh/ssh_host_dsa_key'),
- mock.call('/etc/ssh/ssh_host_ecdsa_key'),
- mock.call('/etc/ssh/ssh_host_ed25519_key')],
- m_path_exists.call_args_list)
- self.assertEqual([mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_dont_allow_public_ssh_keys(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test allow_public_ssh_keys=False ignores ssh public keys from
- platform.
- """
- cfg = {"allow_public_ssh_keys": False}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(), user),
- mock.call(set(), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with no config and a default distro user."""
- cfg = {}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_with_explicit_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with explicit disable_root and a default distro user."""
- # This test is identical to test_handle_no_cfg_and_default_root,
- # except this uses an explicit cfg value
- cfg = {"disable_root": True}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
- options = options.replace("$DISABLE_USER", "root")
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options=options)],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_cfg_without_disable_root(self, m_path_exists, m_nug,
- m_glob, m_setup_keys):
- """Test handle with disable_root == False."""
- # When disable_root == False, the ssh redirect for root is skipped
- cfg = {"disable_root": False}
- keys = ["key1"]
- user = "clouduser"
- m_glob.return_value = [] # Return no matching keys to prevent removal
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
- cc_ssh.handle("name", cfg, cloud, LOG, None)
-
- self.assertEqual([mock.call(set(keys), user),
- mock.call(set(keys), "root", options="")],
- m_setup_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_default(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_enable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = False
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in KEY_NAMES_NO_DSA]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_disable(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': False}}
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertFalse(cloud.datasource.publish_host_keys.call_args_list)
- cloud.datasource.publish_host_keys.assert_not_called()
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_config_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': ['dsa', 'rsa']}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in ['ecdsa', 'ed25519']]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "glob.glob")
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "os.path.exists")
- def test_handle_publish_hostkeys_empty_blacklist(
- self, m_path_exists, m_nug, m_glob, m_setup_keys):
- """Test handle with various configs for ssh_publish_hostkeys."""
- self._publish_hostkey_test_setup()
- cc_ssh.PUBLISH_HOST_KEYS = True
- keys = ["key1"]
- user = "clouduser"
- # Return no matching keys for first glob, test keys for second.
- m_glob.side_effect = iter([
- [],
- self.test_hostkey_files,
- ])
- # Mock os.path.exits to True to short-circuit the key writing logic
- m_path_exists.return_value = True
- m_nug.return_value = ({user: {"default": user}}, {})
- cloud = self.tmp_cloud(
- distro='ubuntu', metadata={'public-keys': keys})
- cloud.datasource.publish_host_keys = mock.Mock()
-
- cfg = {'ssh_publish_hostkeys': {'enabled': True,
- 'blacklist': []}}
- expected_call = [self.test_hostkeys[key_type] for key_type
- in cc_ssh.GENERATE_KEY_NAMES]
- cc_ssh.handle("name", cfg, cloud, LOG, None)
- self.assertEqual([mock.call(expected_call)],
- cloud.datasource.publish_host_keys.call_args_list)
-
- @mock.patch(MODPATH + "ug_util.normalize_users_groups")
- @mock.patch(MODPATH + "util.write_file")
- def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys):
- """Test handle with ssh keys and certificate."""
- # Populate a config dictionary to pass to handle() as well
- # as the expected file-writing calls.
- cfg = {"ssh_keys": {}}
-
- expected_calls = []
- for key_type in cc_ssh.GENERATE_KEY_NAMES:
- private_name = "{}_private".format(key_type)
- public_name = "{}_public".format(key_type)
- cert_name = "{}_certificate".format(key_type)
-
- # Actual key contents don"t have to be realistic
- private_value = "{}_PRIVATE_KEY".format(key_type)
- public_value = "{}_PUBLIC_KEY".format(key_type)
- cert_value = "{}_CERT_KEY".format(key_type)
-
- cfg["ssh_keys"][private_name] = private_value
- cfg["ssh_keys"][public_name] = public_value
- cfg["ssh_keys"][cert_name] = cert_value
-
- expected_calls.extend([
- mock.call(
- '/etc/ssh/ssh_host_{}_key'.format(key_type),
- private_value,
- 384
- ),
- mock.call(
- '/etc/ssh/ssh_host_{}_key.pub'.format(key_type),
- public_value,
- 384
- ),
- mock.call(
- '/etc/ssh/ssh_host_{}_key-cert.pub'.format(key_type),
- cert_value,
- 384
- ),
- mock.call(
- '/etc/ssh/sshd_config',
- ('HostCertificate /etc/ssh/ssh_host_{}_key-cert.pub'
- '\n'.format(key_type)),
- preserve_mode=True
- )
- ])
-
- # Run the handler.
- m_nug.return_value = ([], {})
- with mock.patch(MODPATH + 'ssh_util.parse_ssh_config',
- return_value=[]):
- cc_ssh.handle("name", cfg, self.tmp_cloud(distro='ubuntu'),
- LOG, None)
-
- # Check that all expected output has been done.
- for call_ in expected_calls:
- self.assertIn(call_, m_write_file.call_args_list)
diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py
deleted file mode 100644
index db7fb726..00000000
--- a/cloudinit/config/tests/test_ubuntu_advantage.py
+++ /dev/null
@@ -1,333 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.config.cc_ubuntu_advantage import (
- configure_ua, handle, maybe_install_ua_tools, schema)
-from cloudinit.config.schema import validate_cloudconfig_schema
-from cloudinit import subp
-from cloudinit.tests.helpers import (
- CiTestCase, mock, SchemaTestCaseMixin, skipUnlessJsonSchema)
-
-
-# Module path used in mocks
-MPATH = 'cloudinit.config.cc_ubuntu_advantage'
-
-
-class FakeCloud(object):
- def __init__(self, distro):
- self.distro = distro
-
-
-class TestConfigureUA(CiTestCase):
-
- with_logs = True
- allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
-
- def setUp(self):
- super(TestConfigureUA, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_error(self, m_subp):
- """Errors from ua attach command are raised."""
- m_subp.side_effect = subp.ProcessExecutionError(
- 'Invalid token SomeToken')
- with self.assertRaises(RuntimeError) as context_manager:
- configure_ua(token='SomeToken')
- self.assertEqual(
- 'Failure attaching Ubuntu Advantage:\nUnexpected error while'
- ' running command.\nCommand: -\nExit code: -\nReason: -\n'
- 'Stdout: Invalid token SomeToken\nStderr: -',
- str(context_manager.exception))
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_with_token(self, m_subp):
- """When token is provided, attach the machine to ua using the token."""
- configure_ua(token='SomeToken')
- m_subp.assert_called_once_with(['ua', 'attach', 'SomeToken'])
- self.assertEqual(
- 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
- self.logs.getvalue())
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_on_service_error(self, m_subp):
- """all services should be enabled and then any failures raised"""
-
- def fake_subp(cmd, capture=None):
- fail_cmds = [['ua', 'enable', svc] for svc in ['esm', 'cc']]
- if cmd in fail_cmds and capture:
- svc = cmd[-1]
- raise subp.ProcessExecutionError(
- 'Invalid {} credentials'.format(svc.upper()))
-
- m_subp.side_effect = fake_subp
-
- with self.assertRaises(RuntimeError) as context_manager:
- configure_ua(token='SomeToken', enable=['esm', 'cc', 'fips'])
- self.assertEqual(
- m_subp.call_args_list,
- [mock.call(['ua', 'attach', 'SomeToken']),
- mock.call(['ua', 'enable', 'esm'], capture=True),
- mock.call(['ua', 'enable', 'cc'], capture=True),
- mock.call(['ua', 'enable', 'fips'], capture=True)])
- self.assertIn(
- 'WARNING: Failure enabling "esm":\nUnexpected error'
- ' while running command.\nCommand: -\nExit code: -\nReason: -\n'
- 'Stdout: Invalid ESM credentials\nStderr: -\n',
- self.logs.getvalue())
- self.assertIn(
- 'WARNING: Failure enabling "cc":\nUnexpected error'
- ' while running command.\nCommand: -\nExit code: -\nReason: -\n'
- 'Stdout: Invalid CC credentials\nStderr: -\n',
- self.logs.getvalue())
- self.assertEqual(
- 'Failure enabling Ubuntu Advantage service(s): "esm", "cc"',
- str(context_manager.exception))
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_with_empty_services(self, m_subp):
- """When services is an empty list, do not auto-enable attach."""
- configure_ua(token='SomeToken', enable=[])
- m_subp.assert_called_once_with(['ua', 'attach', 'SomeToken'])
- self.assertEqual(
- 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
- self.logs.getvalue())
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_with_specific_services(self, m_subp):
- """When services a list, only enable specific services."""
- configure_ua(token='SomeToken', enable=['fips'])
- self.assertEqual(
- m_subp.call_args_list,
- [mock.call(['ua', 'attach', 'SomeToken']),
- mock.call(['ua', 'enable', 'fips'], capture=True)])
- self.assertEqual(
- 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
- self.logs.getvalue())
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_with_string_services(self, m_subp):
- """When services a string, treat as singleton list and warn"""
- configure_ua(token='SomeToken', enable='fips')
- self.assertEqual(
- m_subp.call_args_list,
- [mock.call(['ua', 'attach', 'SomeToken']),
- mock.call(['ua', 'enable', 'fips'], capture=True)])
- self.assertEqual(
- 'WARNING: ubuntu_advantage: enable should be a list, not a'
- ' string; treating as a single enable\n'
- 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
- self.logs.getvalue())
-
- @mock.patch('%s.subp.subp' % MPATH)
- def test_configure_ua_attach_with_weird_services(self, m_subp):
- """When services not string or list, warn but still attach"""
- configure_ua(token='SomeToken', enable={'deffo': 'wont work'})
- self.assertEqual(
- m_subp.call_args_list,
- [mock.call(['ua', 'attach', 'SomeToken'])])
- self.assertEqual(
- 'WARNING: ubuntu_advantage: enable should be a list, not a'
- ' dict; skipping enabling services\n'
- 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
- self.logs.getvalue())
-
-
-@skipUnlessJsonSchema()
-class TestSchema(CiTestCase, SchemaTestCaseMixin):
-
- with_logs = True
- schema = schema
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- @mock.patch('%s.configure_ua' % MPATH)
- def test_schema_warns_on_ubuntu_advantage_not_dict(self, _cfg, _):
- """If ubuntu_advantage configuration is not a dict, emit a warning."""
- validate_cloudconfig_schema({'ubuntu_advantage': 'wrong type'}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nubuntu_advantage: 'wrong type' is not"
- " of type 'object'\n",
- self.logs.getvalue())
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- @mock.patch('%s.configure_ua' % MPATH)
- def test_schema_disallows_unknown_keys(self, _cfg, _):
- """Unknown keys in ubuntu_advantage configuration emit warnings."""
- validate_cloudconfig_schema(
- {'ubuntu_advantage': {'token': 'winner', 'invalid-key': ''}},
- schema)
- self.assertIn(
- 'WARNING: Invalid config:\nubuntu_advantage: Additional properties'
- " are not allowed ('invalid-key' was unexpected)",
- self.logs.getvalue())
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- @mock.patch('%s.configure_ua' % MPATH)
- def test_warn_schema_requires_token(self, _cfg, _):
- """Warn if ubuntu_advantage configuration lacks token."""
- validate_cloudconfig_schema(
- {'ubuntu_advantage': {'enable': ['esm']}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nubuntu_advantage:"
- " 'token' is a required property\n", self.logs.getvalue())
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- @mock.patch('%s.configure_ua' % MPATH)
- def test_warn_schema_services_is_not_list_or_dict(self, _cfg, _):
- """Warn when ubuntu_advantage:enable config is not a list."""
- validate_cloudconfig_schema(
- {'ubuntu_advantage': {'enable': 'needslist'}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nubuntu_advantage: 'token' is a"
- " required property\nubuntu_advantage.enable: 'needslist'"
- " is not of type 'array'\n",
- self.logs.getvalue())
-
-
-class TestHandle(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestHandle, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('%s.validate_cloudconfig_schema' % MPATH)
- def test_handle_no_config(self, m_schema):
- """When no ua-related configuration is provided, nothing happens."""
- cfg = {}
- handle('ua-test', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertIn(
- "DEBUG: Skipping module named ua-test, no 'ubuntu_advantage'"
- ' configuration found',
- self.logs.getvalue())
- m_schema.assert_not_called()
-
- @mock.patch('%s.configure_ua' % MPATH)
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- def test_handle_tries_to_install_ubuntu_advantage_tools(
- self, m_install, m_cfg):
- """If ubuntu_advantage is provided, try installing ua-tools package."""
- cfg = {'ubuntu_advantage': {'token': 'valid'}}
- mycloud = FakeCloud(None)
- handle('nomatter', cfg=cfg, cloud=mycloud, log=self.logger, args=None)
- m_install.assert_called_once_with(mycloud)
-
- @mock.patch('%s.configure_ua' % MPATH)
- @mock.patch('%s.maybe_install_ua_tools' % MPATH)
- def test_handle_passes_credentials_and_services_to_configure_ua(
- self, m_install, m_configure_ua):
- """All ubuntu_advantage config keys are passed to configure_ua."""
- cfg = {'ubuntu_advantage': {'token': 'token', 'enable': ['esm']}}
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
- m_configure_ua.assert_called_once_with(
- token='token', enable=['esm'])
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
- @mock.patch('%s.configure_ua' % MPATH)
- def test_handle_warns_on_deprecated_ubuntu_advantage_key_w_config(
- self, m_configure_ua):
- """Warning when ubuntu-advantage key is present with new config"""
- cfg = {'ubuntu-advantage': {'token': 'token', 'enable': ['esm']}}
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertEqual(
- 'WARNING: Deprecated configuration key "ubuntu-advantage"'
- ' provided. Expected underscore delimited "ubuntu_advantage";'
- ' will attempt to continue.',
- self.logs.getvalue().splitlines()[0])
- m_configure_ua.assert_called_once_with(
- token='token', enable=['esm'])
-
- def test_handle_error_on_deprecated_commands_key_dashed(self):
- """Error when commands is present in ubuntu-advantage key."""
- cfg = {'ubuntu-advantage': {'commands': 'nogo'}}
- with self.assertRaises(RuntimeError) as context_manager:
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertEqual(
- 'Deprecated configuration "ubuntu-advantage: commands" provided.'
- ' Expected "token"',
- str(context_manager.exception))
-
- def test_handle_error_on_deprecated_commands_key_underscored(self):
- """Error when commands is present in ubuntu_advantage key."""
- cfg = {'ubuntu_advantage': {'commands': 'nogo'}}
- with self.assertRaises(RuntimeError) as context_manager:
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertEqual(
- 'Deprecated configuration "ubuntu-advantage: commands" provided.'
- ' Expected "token"',
- str(context_manager.exception))
-
- @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
- @mock.patch('%s.configure_ua' % MPATH)
- def test_handle_prefers_new_style_config(
- self, m_configure_ua):
- """ubuntu_advantage should be preferred over ubuntu-advantage"""
- cfg = {
- 'ubuntu-advantage': {'token': 'nope', 'enable': ['wrong']},
- 'ubuntu_advantage': {'token': 'token', 'enable': ['esm']},
- }
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
- self.assertEqual(
- 'WARNING: Deprecated configuration key "ubuntu-advantage"'
- ' provided. Expected underscore delimited "ubuntu_advantage";'
- ' will attempt to continue.',
- self.logs.getvalue().splitlines()[0])
- m_configure_ua.assert_called_once_with(
- token='token', enable=['esm'])
-
-
-class TestMaybeInstallUATools(CiTestCase):
-
- with_logs = True
-
- def setUp(self):
- super(TestMaybeInstallUATools, self).setUp()
- self.tmp = self.tmp_dir()
-
- @mock.patch('%s.subp.which' % MPATH)
- def test_maybe_install_ua_tools_noop_when_ua_tools_present(self, m_which):
- """Do nothing if ubuntu-advantage-tools already exists."""
- m_which.return_value = '/usr/bin/ua' # already installed
- distro = mock.MagicMock()
- distro.update_package_sources.side_effect = RuntimeError(
- 'Some apt error')
- maybe_install_ua_tools(cloud=FakeCloud(distro)) # No RuntimeError
-
- @mock.patch('%s.subp.which' % MPATH)
- def test_maybe_install_ua_tools_raises_update_errors(self, m_which):
- """maybe_install_ua_tools logs and raises apt update errors."""
- m_which.return_value = None
- distro = mock.MagicMock()
- distro.update_package_sources.side_effect = RuntimeError(
- 'Some apt error')
- with self.assertRaises(RuntimeError) as context_manager:
- maybe_install_ua_tools(cloud=FakeCloud(distro))
- self.assertEqual('Some apt error', str(context_manager.exception))
- self.assertIn('Package update failed\nTraceback', self.logs.getvalue())
-
- @mock.patch('%s.subp.which' % MPATH)
- def test_maybe_install_ua_raises_install_errors(self, m_which):
- """maybe_install_ua_tools logs and raises package install errors."""
- m_which.return_value = None
- distro = mock.MagicMock()
- distro.update_package_sources.return_value = None
- distro.install_packages.side_effect = RuntimeError(
- 'Some install error')
- with self.assertRaises(RuntimeError) as context_manager:
- maybe_install_ua_tools(cloud=FakeCloud(distro))
- self.assertEqual('Some install error', str(context_manager.exception))
- self.assertIn(
- 'Failed to install ubuntu-advantage-tools\n', self.logs.getvalue())
-
- @mock.patch('%s.subp.which' % MPATH)
- def test_maybe_install_ua_tools_happy_path(self, m_which):
- """maybe_install_ua_tools installs ubuntu-advantage-tools."""
- m_which.return_value = None
- distro = mock.MagicMock() # No errors raised
- maybe_install_ua_tools(cloud=FakeCloud(distro))
- distro.update_package_sources.assert_called_once_with()
- distro.install_packages.assert_called_once_with(
- ['ubuntu-advantage-tools'])
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_ubuntu_drivers.py b/cloudinit/config/tests/test_ubuntu_drivers.py
deleted file mode 100644
index 504ba356..00000000
--- a/cloudinit/config/tests/test_ubuntu_drivers.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import copy
-import os
-
-from cloudinit.tests.helpers import CiTestCase, skipUnlessJsonSchema, mock
-from cloudinit.config.schema import (
- SchemaValidationError, validate_cloudconfig_schema)
-from cloudinit.config import cc_ubuntu_drivers as drivers
-from cloudinit.subp import ProcessExecutionError
-
-MPATH = "cloudinit.config.cc_ubuntu_drivers."
-M_TMP_PATH = MPATH + "temp_utils.mkdtemp"
-OLD_UBUNTU_DRIVERS_ERROR_STDERR = (
- "ubuntu-drivers: error: argument <command>: invalid choice: 'install' "
- "(choose from 'list', 'autoinstall', 'devices', 'debug')\n")
-
-
-# The tests in this module call helper methods which are decorated with
-# mock.patch. pylint doesn't understand that mock.patch passes parameters to
-# the decorated function, so it incorrectly reports that we aren't passing
-# values for all parameters. Instead of annotating every single call, we
-# disable it for the entire module:
-# pylint: disable=no-value-for-parameter
-
-class AnyTempScriptAndDebconfFile(object):
-
- def __init__(self, tmp_dir, debconf_file):
- self.tmp_dir = tmp_dir
- self.debconf_file = debconf_file
-
- def __eq__(self, cmd):
- if not len(cmd) == 2:
- return False
- script, debconf_file = cmd
- if bool(script.startswith(self.tmp_dir) and script.endswith('.sh')):
- return debconf_file == self.debconf_file
- return False
-
-
-class TestUbuntuDrivers(CiTestCase):
- cfg_accepted = {'drivers': {'nvidia': {'license-accepted': True}}}
- install_gpgpu = ['ubuntu-drivers', 'install', '--gpgpu', 'nvidia']
-
- with_logs = True
-
- @skipUnlessJsonSchema()
- def test_schema_requires_boolean_for_license_accepted(self):
- with self.assertRaisesRegex(
- SchemaValidationError, ".*license-accepted.*TRUE.*boolean"):
- validate_cloudconfig_schema(
- {'drivers': {'nvidia': {'license-accepted': "TRUE"}}},
- schema=drivers.schema, strict=True)
-
- @mock.patch(M_TMP_PATH)
- @mock.patch(MPATH + "subp.subp", return_value=('', ''))
- @mock.patch(MPATH + "subp.which", return_value=False)
- def _assert_happy_path_taken(
- self, config, m_which, m_subp, m_tmp):
- """Positive path test through handle. Package should be installed."""
- tdir = self.tmp_dir()
- debconf_file = os.path.join(tdir, 'nvidia.template')
- m_tmp.return_value = tdir
- myCloud = mock.MagicMock()
- drivers.handle('ubuntu_drivers', config, myCloud, None, None)
- self.assertEqual([mock.call(['ubuntu-drivers-common'])],
- myCloud.distro.install_packages.call_args_list)
- self.assertEqual(
- [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
- mock.call(self.install_gpgpu)],
- m_subp.call_args_list)
-
- def test_handle_does_package_install(self):
- self._assert_happy_path_taken(self.cfg_accepted)
-
- def test_trueish_strings_are_considered_approval(self):
- for true_value in ['yes', 'true', 'on', '1']:
- new_config = copy.deepcopy(self.cfg_accepted)
- new_config['drivers']['nvidia']['license-accepted'] = true_value
- self._assert_happy_path_taken(new_config)
-
- @mock.patch(M_TMP_PATH)
- @mock.patch(MPATH + "subp.subp")
- @mock.patch(MPATH + "subp.which", return_value=False)
- def test_handle_raises_error_if_no_drivers_found(
- self, m_which, m_subp, m_tmp):
- """If ubuntu-drivers doesn't install any drivers, raise an error."""
- tdir = self.tmp_dir()
- debconf_file = os.path.join(tdir, 'nvidia.template')
- m_tmp.return_value = tdir
- myCloud = mock.MagicMock()
-
- def fake_subp(cmd):
- if cmd[0].startswith(tdir):
- return
- raise ProcessExecutionError(
- stdout='No drivers found for installation.\n', exit_code=1)
- m_subp.side_effect = fake_subp
-
- with self.assertRaises(Exception):
- drivers.handle(
- 'ubuntu_drivers', self.cfg_accepted, myCloud, None, None)
- self.assertEqual([mock.call(['ubuntu-drivers-common'])],
- myCloud.distro.install_packages.call_args_list)
- self.assertEqual(
- [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
- mock.call(self.install_gpgpu)],
- m_subp.call_args_list)
- self.assertIn('ubuntu-drivers found no drivers for installation',
- self.logs.getvalue())
-
- @mock.patch(MPATH + "subp.subp", return_value=('', ''))
- @mock.patch(MPATH + "subp.which", return_value=False)
- def _assert_inert_with_config(self, config, m_which, m_subp):
- """Helper to reduce repetition when testing negative cases"""
- myCloud = mock.MagicMock()
- drivers.handle('ubuntu_drivers', config, myCloud, None, None)
- self.assertEqual(0, myCloud.distro.install_packages.call_count)
- self.assertEqual(0, m_subp.call_count)
-
- def test_handle_inert_if_license_not_accepted(self):
- """Ensure we don't do anything if the license is rejected."""
- self._assert_inert_with_config(
- {'drivers': {'nvidia': {'license-accepted': False}}})
-
- def test_handle_inert_if_garbage_in_license_field(self):
- """Ensure we don't do anything if unknown text is in license field."""
- self._assert_inert_with_config(
- {'drivers': {'nvidia': {'license-accepted': 'garbage'}}})
-
- def test_handle_inert_if_no_license_key(self):
- """Ensure we don't do anything if no license key."""
- self._assert_inert_with_config({'drivers': {'nvidia': {}}})
-
- def test_handle_inert_if_no_nvidia_key(self):
- """Ensure we don't do anything if other license accepted."""
- self._assert_inert_with_config(
- {'drivers': {'acme': {'license-accepted': True}}})
-
- def test_handle_inert_if_string_given(self):
- """Ensure we don't do anything if string refusal given."""
- for false_value in ['no', 'false', 'off', '0']:
- self._assert_inert_with_config(
- {'drivers': {'nvidia': {'license-accepted': false_value}}})
-
- @mock.patch(MPATH + "install_drivers")
- def test_handle_no_drivers_does_nothing(self, m_install_drivers):
- """If no 'drivers' key in the config, nothing should be done."""
- myCloud = mock.MagicMock()
- myLog = mock.MagicMock()
- drivers.handle('ubuntu_drivers', {'foo': 'bzr'}, myCloud, myLog, None)
- self.assertIn('Skipping module named',
- myLog.debug.call_args_list[0][0][0])
- self.assertEqual(0, m_install_drivers.call_count)
-
- @mock.patch(M_TMP_PATH)
- @mock.patch(MPATH + "subp.subp", return_value=('', ''))
- @mock.patch(MPATH + "subp.which", return_value=True)
- def test_install_drivers_no_install_if_present(
- self, m_which, m_subp, m_tmp):
- """If 'ubuntu-drivers' is present, no package install should occur."""
- tdir = self.tmp_dir()
- debconf_file = os.path.join(tdir, 'nvidia.template')
- m_tmp.return_value = tdir
- pkg_install = mock.MagicMock()
- drivers.install_drivers(self.cfg_accepted['drivers'],
- pkg_install_func=pkg_install)
- self.assertEqual(0, pkg_install.call_count)
- self.assertEqual([mock.call('ubuntu-drivers')],
- m_which.call_args_list)
- self.assertEqual(
- [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
- mock.call(self.install_gpgpu)],
- m_subp.call_args_list)
-
- def test_install_drivers_rejects_invalid_config(self):
- """install_drivers should raise TypeError if not given a config dict"""
- pkg_install = mock.MagicMock()
- with self.assertRaisesRegex(TypeError, ".*expected dict.*"):
- drivers.install_drivers("mystring", pkg_install_func=pkg_install)
- self.assertEqual(0, pkg_install.call_count)
-
- @mock.patch(M_TMP_PATH)
- @mock.patch(MPATH + "subp.subp")
- @mock.patch(MPATH + "subp.which", return_value=False)
- def test_install_drivers_handles_old_ubuntu_drivers_gracefully(
- self, m_which, m_subp, m_tmp):
- """Older ubuntu-drivers versions should emit message and raise error"""
- tdir = self.tmp_dir()
- debconf_file = os.path.join(tdir, 'nvidia.template')
- m_tmp.return_value = tdir
- myCloud = mock.MagicMock()
-
- def fake_subp(cmd):
- if cmd[0].startswith(tdir):
- return
- raise ProcessExecutionError(
- stderr=OLD_UBUNTU_DRIVERS_ERROR_STDERR, exit_code=2)
- m_subp.side_effect = fake_subp
-
- with self.assertRaises(Exception):
- drivers.handle(
- 'ubuntu_drivers', self.cfg_accepted, myCloud, None, None)
- self.assertEqual([mock.call(['ubuntu-drivers-common'])],
- myCloud.distro.install_packages.call_args_list)
- self.assertEqual(
- [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
- mock.call(self.install_gpgpu)],
- m_subp.call_args_list)
- self.assertIn('WARNING: the available version of ubuntu-drivers is'
- ' too old to perform requested driver installation',
- self.logs.getvalue())
-
-
-# Sub-class TestUbuntuDrivers to run the same test cases, but with a version
-class TestUbuntuDriversWithVersion(TestUbuntuDrivers):
- cfg_accepted = {
- 'drivers': {'nvidia': {'license-accepted': True, 'version': '123'}}}
- install_gpgpu = ['ubuntu-drivers', 'install', '--gpgpu', 'nvidia:123']
-
- @mock.patch(M_TMP_PATH)
- @mock.patch(MPATH + "subp.subp", return_value=('', ''))
- @mock.patch(MPATH + "subp.which", return_value=False)
- def test_version_none_uses_latest(self, m_which, m_subp, m_tmp):
- tdir = self.tmp_dir()
- debconf_file = os.path.join(tdir, 'nvidia.template')
- m_tmp.return_value = tdir
- myCloud = mock.MagicMock()
- version_none_cfg = {
- 'drivers': {'nvidia': {'license-accepted': True, 'version': None}}}
- drivers.handle(
- 'ubuntu_drivers', version_none_cfg, myCloud, None, None)
- self.assertEqual(
- [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
- mock.call(['ubuntu-drivers', 'install', '--gpgpu', 'nvidia'])],
- m_subp.call_args_list)
-
- def test_specifying_a_version_doesnt_override_license_acceptance(self):
- self._assert_inert_with_config({
- 'drivers': {'nvidia': {'license-accepted': False,
- 'version': '123'}}
- })
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_users_groups.py b/cloudinit/config/tests/test_users_groups.py
deleted file mode 100644
index df89ddb3..00000000
--- a/cloudinit/config/tests/test_users_groups.py
+++ /dev/null
@@ -1,172 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-
-from cloudinit.config import cc_users_groups
-from cloudinit.tests.helpers import CiTestCase, mock
-
-MODPATH = "cloudinit.config.cc_users_groups"
-
-
-@mock.patch('cloudinit.distros.ubuntu.Distro.create_group')
-@mock.patch('cloudinit.distros.ubuntu.Distro.create_user')
-class TestHandleUsersGroups(CiTestCase):
- """Test cc_users_groups handling of config."""
-
- with_logs = True
-
- def test_handle_no_cfg_creates_no_users_or_groups(self, m_user, m_group):
- """Test handle with no config will not create users or groups."""
- cfg = {} # merged cloud-config
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- m_user.assert_not_called()
- m_group.assert_not_called()
-
- def test_handle_users_in_cfg_calls_create_users(self, m_user, m_group):
- """When users in config, create users with distro.create_user."""
- cfg = {'users': ['default', {'name': 'me2'}]} # merged cloud-config
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- self.assertCountEqual(
- m_user.call_args_list,
- [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True,
- shell='/bin/bash'),
- mock.call('me2', default=False)])
- m_group.assert_not_called()
-
- @mock.patch('cloudinit.distros.freebsd.Distro.create_group')
- @mock.patch('cloudinit.distros.freebsd.Distro.create_user')
- def test_handle_users_in_cfg_calls_create_users_on_bsd(
- self,
- m_fbsd_user,
- m_fbsd_group,
- m_linux_user,
- m_linux_group,
- ):
- """When users in config, create users with freebsd.create_user."""
- cfg = {'users': ['default', {'name': 'me2'}]} # merged cloud-config
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'freebsd', 'lock_passwd': True,
- 'groups': ['wheel'],
- 'shell': '/bin/tcsh'}}
- metadata = {}
- cloud = self.tmp_cloud(
- distro='freebsd', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- self.assertCountEqual(
- m_fbsd_user.call_args_list,
- [mock.call('freebsd', groups='wheel', lock_passwd=True,
- shell='/bin/tcsh'),
- mock.call('me2', default=False)])
- m_fbsd_group.assert_not_called()
- m_linux_group.assert_not_called()
- m_linux_user.assert_not_called()
-
- def test_users_with_ssh_redirect_user_passes_keys(self, m_user, m_group):
- """When ssh_redirect_user is True pass default user and cloud keys."""
- cfg = {
- 'users': ['default', {'name': 'me2', 'ssh_redirect_user': True}]}
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {'public-keys': ['key1']}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- self.assertCountEqual(
- m_user.call_args_list,
- [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True,
- shell='/bin/bash'),
- mock.call('me2', cloud_public_ssh_keys=['key1'], default=False,
- ssh_redirect_user='ubuntu')])
- m_group.assert_not_called()
-
- def test_users_with_ssh_redirect_user_default_str(self, m_user, m_group):
- """When ssh_redirect_user is 'default' pass default username."""
- cfg = {
- 'users': ['default', {'name': 'me2',
- 'ssh_redirect_user': 'default'}]}
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {'public-keys': ['key1']}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- self.assertCountEqual(
- m_user.call_args_list,
- [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True,
- shell='/bin/bash'),
- mock.call('me2', cloud_public_ssh_keys=['key1'], default=False,
- ssh_redirect_user='ubuntu')])
- m_group.assert_not_called()
-
- def test_users_with_ssh_redirect_user_non_default(self, m_user, m_group):
- """Warn when ssh_redirect_user is not 'default'."""
- cfg = {
- 'users': ['default', {'name': 'me2',
- 'ssh_redirect_user': 'snowflake'}]}
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {'public-keys': ['key1']}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- with self.assertRaises(ValueError) as context_manager:
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- m_group.assert_not_called()
- self.assertEqual(
- 'Not creating user me2. Invalid value of ssh_redirect_user:'
- ' snowflake. Expected values: true, default or false.',
- str(context_manager.exception))
-
- def test_users_with_ssh_redirect_user_default_false(self, m_user, m_group):
- """When unspecified ssh_redirect_user is false and not set up."""
- cfg = {'users': ['default', {'name': 'me2'}]}
- # System config defines a default user for the distro.
- sys_cfg = {'default_user': {'name': 'ubuntu', 'lock_passwd': True,
- 'groups': ['lxd', 'sudo'],
- 'shell': '/bin/bash'}}
- metadata = {'public-keys': ['key1']}
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- self.assertCountEqual(
- m_user.call_args_list,
- [mock.call('ubuntu', groups='lxd,sudo', lock_passwd=True,
- shell='/bin/bash'),
- mock.call('me2', default=False)])
- m_group.assert_not_called()
-
- def test_users_ssh_redirect_user_and_no_default(self, m_user, m_group):
- """Warn when ssh_redirect_user is True and no default user present."""
- cfg = {
- 'users': ['default', {'name': 'me2', 'ssh_redirect_user': True}]}
- # System config defines *no* default user for the distro.
- sys_cfg = {}
- metadata = {} # no public-keys defined
- cloud = self.tmp_cloud(
- distro='ubuntu', sys_cfg=sys_cfg, metadata=metadata)
- cc_users_groups.handle('modulename', cfg, cloud, None, None)
- m_user.assert_called_once_with('me2', default=False)
- m_group.assert_not_called()
- self.assertEqual(
- 'WARNING: Ignoring ssh_redirect_user: True for me2. No'
- ' default_user defined. Perhaps missing'
- ' cloud configuration users: [default, ..].\n',
- self.logs.getvalue())