summaryrefslogtreecommitdiff
path: root/cloudinit/config/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/tests')
-rw-r--r--cloudinit/config/tests/test_apt_pipelining.py28
-rw-r--r--cloudinit/config/tests/test_set_passwords.py88
-rw-r--r--cloudinit/config/tests/test_snap.py2
-rw-r--r--cloudinit/config/tests/test_ssh.py202
-rw-r--r--cloudinit/config/tests/test_ubuntu_advantage.py347
-rw-r--r--cloudinit/config/tests/test_ubuntu_drivers.py237
-rw-r--r--cloudinit/config/tests/test_users_groups.py28
7 files changed, 769 insertions, 163 deletions
diff --git a/cloudinit/config/tests/test_apt_pipelining.py b/cloudinit/config/tests/test_apt_pipelining.py
new file mode 100644
index 00000000..2a6bb10b
--- /dev/null
+++ b/cloudinit/config/tests/test_apt_pipelining.py
@@ -0,0 +1,28 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests cc_apt_pipelining handler"""
+
+import cloudinit.config.cc_apt_pipelining as cc_apt_pipelining
+
+from cloudinit.tests.helpers import CiTestCase, mock
+
+
+class TestAptPipelining(CiTestCase):
+
+ @mock.patch('cloudinit.config.cc_apt_pipelining.util.write_file')
+ def test_not_disabled_by_default(self, m_write_file):
+ """ensure that default behaviour is to not disable pipelining"""
+ cc_apt_pipelining.handle('foo', {}, None, mock.MagicMock(), None)
+ self.assertEqual(0, m_write_file.call_count)
+
+ @mock.patch('cloudinit.config.cc_apt_pipelining.util.write_file')
+ def test_false_disables_pipelining(self, m_write_file):
+ """ensure that pipelining can be disabled with correct config"""
+ cc_apt_pipelining.handle(
+ 'foo', {'apt_pipelining': 'false'}, None, mock.MagicMock(), None)
+ self.assertEqual(1, m_write_file.call_count)
+ args, _ = m_write_file.call_args
+ self.assertEqual(cc_apt_pipelining.DEFAULT_FILE, args[0])
+ self.assertIn('Pipeline-Depth "0"', args[1])
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_set_passwords.py b/cloudinit/config/tests/test_set_passwords.py
index b051ec82..8247c388 100644
--- a/cloudinit/config/tests/test_set_passwords.py
+++ b/cloudinit/config/tests/test_set_passwords.py
@@ -1,6 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import mock
+from unittest import mock
from cloudinit.config import cc_set_passwords as setpass
from cloudinit.tests.helpers import CiTestCase
@@ -45,7 +45,7 @@ class TestHandleSshPwauth(CiTestCase):
"""If config is not updated, then no system restart should be done."""
setpass.handle_ssh_pwauth(True)
m_subp.assert_not_called()
- self.assertIn("No need to restart ssh", self.logs.getvalue())
+ self.assertIn("No need to restart SSH", self.logs.getvalue())
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch(MODPATH + "util.subp")
@@ -68,4 +68,88 @@ class TestHandleSshPwauth(CiTestCase):
m_update.assert_called_with({optname: optval})
m_subp.assert_not_called()
+
+class TestSetPasswordsHandle(CiTestCase):
+ """Test cc_set_passwords.handle"""
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestSetPasswordsHandle, self).setUp()
+ self.add_patch('cloudinit.config.cc_set_passwords.sys.stderr', 'm_err')
+
+ def test_handle_on_empty_config(self, *args):
+ """handle logs that no password has changed when config is empty."""
+ cloud = self.tmp_cloud(distro='ubuntu')
+ setpass.handle(
+ 'IGNORED', cfg={}, cloud=cloud, log=self.logger, args=[])
+ self.assertEqual(
+ "DEBUG: Leaving SSH config 'PasswordAuthentication' unchanged. "
+ 'ssh_pwauth=None\n',
+ self.logs.getvalue())
+
+ @mock.patch(MODPATH + "util.subp")
+ def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
+ """handle parses command password hashes."""
+ cloud = self.tmp_cloud(distro='ubuntu')
+ valid_hashed_pwds = [
+ 'root:$2y$10$8BQjxjVByHA/Ee.O1bCXtO8S7Y5WojbXWqnqYpUW.BrPx/'
+ 'Dlew1Va',
+ 'ubuntu:$6$5hOurLPO$naywm3Ce0UlmZg9gG2Fl9acWCVEoakMMC7dR52q'
+ 'SDexZbrN9z8yHxhUM2b.sxpguSwOlbOQSW/HpXazGGx3oo1']
+ cfg = {'chpasswd': {'list': valid_hashed_pwds}}
+ with mock.patch(MODPATH + 'util.subp') as m_subp:
+ setpass.handle(
+ 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
+ self.assertIn(
+ 'DEBUG: Handling input for chpasswd as list.',
+ self.logs.getvalue())
+ self.assertIn(
+ "DEBUG: Setting hashed password for ['root', 'ubuntu']",
+ self.logs.getvalue())
+ self.assertEqual(
+ [mock.call(['chpasswd', '-e'],
+ '\n'.join(valid_hashed_pwds) + '\n')],
+ m_subp.call_args_list)
+
+ @mock.patch(MODPATH + "util.is_FreeBSD")
+ @mock.patch(MODPATH + "util.subp")
+ def test_freebsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
+ self, m_subp, m_is_freebsd):
+ """FreeBSD calls custom pw commands instead of chpasswd and passwd"""
+ m_is_freebsd.return_value = True
+ cloud = self.tmp_cloud(distro='freebsd')
+ valid_pwds = ['ubuntu:passw0rd']
+ cfg = {'chpasswd': {'list': valid_pwds}}
+ setpass.handle(
+ 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
+ self.assertEqual([
+ mock.call(['pw', 'usermod', 'ubuntu', '-h', '0'], data='passw0rd',
+ logstring="chpasswd for ubuntu"),
+ mock.call(['pw', 'usermod', 'ubuntu', '-p', '01-Jan-1970'])],
+ m_subp.call_args_list)
+
+ @mock.patch(MODPATH + "util.is_FreeBSD")
+ @mock.patch(MODPATH + "util.subp")
+ def test_handle_on_chpasswd_list_creates_random_passwords(self, m_subp,
+ m_is_freebsd):
+ """handle parses command set random passwords."""
+ m_is_freebsd.return_value = False
+ cloud = self.tmp_cloud(distro='ubuntu')
+ valid_random_pwds = [
+ 'root:R',
+ 'ubuntu:RANDOM']
+ cfg = {'chpasswd': {'expire': 'false', 'list': valid_random_pwds}}
+ with mock.patch(MODPATH + 'util.subp') as m_subp:
+ setpass.handle(
+ 'IGNORED', cfg=cfg, cloud=cloud, log=self.logger, args=[])
+ self.assertIn(
+ 'DEBUG: Handling input for chpasswd as list.',
+ self.logs.getvalue())
+ self.assertNotEqual(
+ [mock.call(['chpasswd'],
+ '\n'.join(valid_random_pwds) + '\n')],
+ m_subp.call_args_list)
+
+
# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py
index 3c472891..cbbb173d 100644
--- a/cloudinit/config/tests/test_snap.py
+++ b/cloudinit/config/tests/test_snap.py
@@ -1,7 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
import re
-from six import StringIO
+from io import StringIO
from cloudinit.config.cc_snap import (
ASSERTIONS_FILE, add_assertions, handle, maybe_install_squashfuse,
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
index c8a4271f..0c554414 100644
--- a/cloudinit/config/tests/test_ssh.py
+++ b/cloudinit/config/tests/test_ssh.py
@@ -1,9 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import os.path
from cloudinit.config import cc_ssh
from cloudinit import ssh_util
from cloudinit.tests.helpers import CiTestCase, mock
+import logging
+
+LOG = logging.getLogger(__name__)
MODPATH = "cloudinit.config.cc_ssh."
@@ -12,6 +16,25 @@ MODPATH = "cloudinit.config.cc_ssh."
class TestHandleSsh(CiTestCase):
"""Test cc_ssh handling of ssh config."""
+ def _publish_hostkey_test_setup(self):
+ self.test_hostkeys = {
+ 'dsa': ('ssh-dss', 'AAAAB3NzaC1kc3MAAACB'),
+ 'ecdsa': ('ecdsa-sha2-nistp256', 'AAAAE2VjZ'),
+ 'ed25519': ('ssh-ed25519', 'AAAAC3NzaC1lZDI'),
+ 'rsa': ('ssh-rsa', 'AAAAB3NzaC1yc2EAAA'),
+ }
+ self.test_hostkey_files = []
+ hostkey_tmpdir = self.tmp_dir()
+ for key_type in ['dsa', 'ecdsa', 'ed25519', 'rsa']:
+ key_data = self.test_hostkeys[key_type]
+ filename = 'ssh_host_%s_key.pub' % key_type
+ filepath = os.path.join(hostkey_tmpdir, filename)
+ self.test_hostkey_files.append(filepath)
+ with open(filepath, 'w') as f:
+ f.write(' '.join(key_data))
+
+ cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, 'ssh_host_%s_key')
+
def test_apply_credentials_with_user(self, m_setup_keys):
"""Apply keys for the given user and root."""
keys = ["key1"]
@@ -64,9 +87,10 @@ class TestHandleSsh(CiTestCase):
# Mock os.path.exits to True to short-circuit the key writing logic
m_path_exists.return_value = True
m_nug.return_value = ([], {})
+ cc_ssh.PUBLISH_HOST_KEYS = False
cloud = self.tmp_cloud(
distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, None, None)
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE")
options = options.replace("$DISABLE_USER", "root")
m_glob.assert_called_once_with('/etc/ssh/ssh_host_*key*')
@@ -82,6 +106,31 @@ class TestHandleSsh(CiTestCase):
@mock.patch(MODPATH + "glob.glob")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
@mock.patch(MODPATH + "os.path.exists")
+ def test_dont_allow_public_ssh_keys(self, m_path_exists, m_nug,
+ m_glob, m_setup_keys):
+ """Test allow_public_ssh_keys=False ignores ssh public keys from
+ platform.
+ """
+ cfg = {"allow_public_ssh_keys": False}
+ keys = ["key1"]
+ user = "clouduser"
+ m_glob.return_value = [] # Return no matching keys to prevent removal
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+
+ options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
+ options = options.replace("$DISABLE_USER", "root")
+ self.assertEqual([mock.call(set(), user),
+ mock.call(set(), "root", options=options)],
+ m_setup_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
def test_handle_no_cfg_and_default_root(self, m_path_exists, m_nug,
m_glob, m_setup_keys):
"""Test handle with no config and a default distro user."""
@@ -94,7 +143,7 @@ class TestHandleSsh(CiTestCase):
m_nug.return_value = ({user: {"default": user}}, {})
cloud = self.tmp_cloud(
distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, None, None)
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
options = options.replace("$DISABLE_USER", "root")
@@ -119,7 +168,7 @@ class TestHandleSsh(CiTestCase):
m_nug.return_value = ({user: {"default": user}}, {})
cloud = self.tmp_cloud(
distro='ubuntu', metadata={'public-keys': keys})
- cc_ssh.handle("name", cfg, cloud, None, None)
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user)
options = options.replace("$DISABLE_USER", "root")
@@ -144,8 +193,153 @@ class TestHandleSsh(CiTestCase):
cloud = self.tmp_cloud(
distro='ubuntu', metadata={'public-keys': keys})
cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
- cc_ssh.handle("name", cfg, cloud, None, None)
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
self.assertEqual([mock.call(set(keys), user),
mock.call(set(keys), "root", options="")],
m_setup_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_default(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_enable(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = False
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_disable(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': False}}
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+ self.assertFalse(cloud.datasource.publish_host_keys.call_args_list)
+ cloud.datasource.publish_host_keys.assert_not_called()
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_config_blacklist(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True,
+ 'blacklist': ['dsa', 'rsa']}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['ecdsa', 'ed25519']]
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
+
+ @mock.patch(MODPATH + "glob.glob")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_publish_hostkeys_empty_blacklist(
+ self, m_path_exists, m_nug, m_glob, m_setup_keys):
+ """Test handle with various configs for ssh_publish_hostkeys."""
+ self._publish_hostkey_test_setup()
+ cc_ssh.PUBLISH_HOST_KEYS = True
+ keys = ["key1"]
+ user = "clouduser"
+ # Return no matching keys for first glob, test keys for second.
+ m_glob.side_effect = iter([
+ [],
+ self.test_hostkey_files,
+ ])
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_path_exists.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = self.tmp_cloud(
+ distro='ubuntu', metadata={'public-keys': keys})
+ cloud.datasource.publish_host_keys = mock.Mock()
+
+ cfg = {'ssh_publish_hostkeys': {'enabled': True,
+ 'blacklist': []}}
+ expected_call = [self.test_hostkeys[key_type] for key_type
+ in ['dsa', 'ecdsa', 'ed25519', 'rsa']]
+ cc_ssh.handle("name", cfg, cloud, LOG, None)
+ self.assertEqual([mock.call(expected_call)],
+ cloud.datasource.publish_host_keys.call_args_list)
diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py
index b7cf9bee..8c4161ef 100644
--- a/cloudinit/config/tests/test_ubuntu_advantage.py
+++ b/cloudinit/config/tests/test_ubuntu_advantage.py
@@ -1,10 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import re
-from six import StringIO
-
from cloudinit.config.cc_ubuntu_advantage import (
- handle, maybe_install_ua_tools, run_commands, schema)
+ configure_ua, handle, maybe_install_ua_tools, schema)
from cloudinit.config.schema import validate_cloudconfig_schema
from cloudinit import util
from cloudinit.tests.helpers import (
@@ -20,90 +17,120 @@ class FakeCloud(object):
self.distro = distro
-class TestRunCommands(CiTestCase):
+class TestConfigureUA(CiTestCase):
with_logs = True
allowed_subp = [CiTestCase.SUBP_SHELL_TRUE]
def setUp(self):
- super(TestRunCommands, self).setUp()
+ super(TestConfigureUA, self).setUp()
self.tmp = self.tmp_dir()
@mock.patch('%s.util.subp' % MPATH)
- def test_run_commands_on_empty_list(self, m_subp):
- """When provided with an empty list, run_commands does nothing."""
- run_commands([])
- self.assertEqual('', self.logs.getvalue())
- m_subp.assert_not_called()
-
- def test_run_commands_on_non_list_or_dict(self):
- """When provided an invalid type, run_commands raises an error."""
- with self.assertRaises(TypeError) as context_manager:
- run_commands(commands="I'm Not Valid")
+ def test_configure_ua_attach_error(self, m_subp):
+ """Errors from ua attach command are raised."""
+ m_subp.side_effect = util.ProcessExecutionError(
+ 'Invalid token SomeToken')
+ with self.assertRaises(RuntimeError) as context_manager:
+ configure_ua(token='SomeToken')
self.assertEqual(
- "commands parameter was not a list or dict: I'm Not Valid",
+ 'Failure attaching Ubuntu Advantage:\nUnexpected error while'
+ ' running command.\nCommand: -\nExit code: -\nReason: -\n'
+ 'Stdout: Invalid token SomeToken\nStderr: -',
str(context_manager.exception))
- def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
- """All exit codes are logged to stderr."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
-
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'bogus command'
- cmd3 = 'echo "MOM" >> %s' % outfile
- commands = [cmd1, cmd2, cmd3]
-
- mock_path = '%s.sys.stderr' % MPATH
- with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
- with self.assertRaises(RuntimeError) as context_manager:
- run_commands(commands=commands)
-
- self.assertIsNotNone(
- re.search(r'bogus: (command )?not found',
- str(context_manager.exception)),
- msg='Expected bogus command not found')
- expected_stderr_log = '\n'.join([
- 'Begin run command: {cmd}'.format(cmd=cmd1),
- 'End run command: exit(0)',
- 'Begin run command: {cmd}'.format(cmd=cmd2),
- 'ERROR: End run command: exit(127)',
- 'Begin run command: {cmd}'.format(cmd=cmd3),
- 'End run command: exit(0)\n'])
- self.assertEqual(expected_stderr_log, m_stderr.getvalue())
-
- def test_run_command_as_lists(self):
- """When commands are specified as a list, run them in order."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
-
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'echo "MOM" >> %s' % outfile
- commands = [cmd1, cmd2]
- with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO):
- run_commands(commands=commands)
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_with_token(self, m_subp):
+ """When token is provided, attach the machine to ua using the token."""
+ configure_ua(token='SomeToken')
+ m_subp.assert_called_once_with(['ua', 'attach', 'SomeToken'])
+ self.assertEqual(
+ 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
+ self.logs.getvalue())
+
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_on_service_error(self, m_subp):
+ """all services should be enabled and then any failures raised"""
+ def fake_subp(cmd, capture=None):
+ fail_cmds = [['ua', 'enable', svc] for svc in ['esm', 'cc']]
+ if cmd in fail_cmds and capture:
+ svc = cmd[-1]
+ raise util.ProcessExecutionError(
+ 'Invalid {} credentials'.format(svc.upper()))
+
+ m_subp.side_effect = fake_subp
+
+ with self.assertRaises(RuntimeError) as context_manager:
+ configure_ua(token='SomeToken', enable=['esm', 'cc', 'fips'])
+ self.assertEqual(
+ m_subp.call_args_list,
+ [mock.call(['ua', 'attach', 'SomeToken']),
+ mock.call(['ua', 'enable', 'esm'], capture=True),
+ mock.call(['ua', 'enable', 'cc'], capture=True),
+ mock.call(['ua', 'enable', 'fips'], capture=True)])
self.assertIn(
- 'DEBUG: Running user-provided ubuntu-advantage commands',
+ 'WARNING: Failure enabling "esm":\nUnexpected error'
+ ' while running command.\nCommand: -\nExit code: -\nReason: -\n'
+ 'Stdout: Invalid ESM credentials\nStderr: -\n',
self.logs.getvalue())
- self.assertEqual('HI\nMOM\n', util.load_file(outfile))
self.assertIn(
- 'WARNING: Non-ubuntu-advantage commands in ubuntu-advantage'
- ' config:',
+ 'WARNING: Failure enabling "cc":\nUnexpected error'
+ ' while running command.\nCommand: -\nExit code: -\nReason: -\n'
+ 'Stdout: Invalid CC credentials\nStderr: -\n',
+ self.logs.getvalue())
+ self.assertEqual(
+ 'Failure enabling Ubuntu Advantage service(s): "esm", "cc"',
+ str(context_manager.exception))
+
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_with_empty_services(self, m_subp):
+ """When services is an empty list, do not auto-enable attach."""
+ configure_ua(token='SomeToken', enable=[])
+ m_subp.assert_called_once_with(['ua', 'attach', 'SomeToken'])
+ self.assertEqual(
+ 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
self.logs.getvalue())
- def test_run_command_dict_sorted_as_command_script(self):
- """When commands are a dict, sort them and run."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
- cmd1 = 'echo "HI" >> %s' % outfile
- cmd2 = 'echo "MOM" >> %s' % outfile
- commands = {'02': cmd1, '01': cmd2}
- with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO):
- run_commands(commands=commands)
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_with_specific_services(self, m_subp):
+ """When services a list, only enable specific services."""
+ configure_ua(token='SomeToken', enable=['fips'])
+ self.assertEqual(
+ m_subp.call_args_list,
+ [mock.call(['ua', 'attach', 'SomeToken']),
+ mock.call(['ua', 'enable', 'fips'], capture=True)])
+ self.assertEqual(
+ 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
+ self.logs.getvalue())
+
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_with_string_services(self, m_subp):
+ """When services a string, treat as singleton list and warn"""
+ configure_ua(token='SomeToken', enable='fips')
+ self.assertEqual(
+ m_subp.call_args_list,
+ [mock.call(['ua', 'attach', 'SomeToken']),
+ mock.call(['ua', 'enable', 'fips'], capture=True)])
+ self.assertEqual(
+ 'WARNING: ubuntu_advantage: enable should be a list, not a'
+ ' string; treating as a single enable\n'
+ 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
+ self.logs.getvalue())
- expected_messages = [
- 'DEBUG: Running user-provided ubuntu-advantage commands']
- for message in expected_messages:
- self.assertIn(message, self.logs.getvalue())
- self.assertEqual('MOM\nHI\n', util.load_file(outfile))
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_configure_ua_attach_with_weird_services(self, m_subp):
+ """When services not string or list, warn but still attach"""
+ configure_ua(token='SomeToken', enable={'deffo': 'wont work'})
+ self.assertEqual(
+ m_subp.call_args_list,
+ [mock.call(['ua', 'attach', 'SomeToken'])])
+ self.assertEqual(
+ 'WARNING: ubuntu_advantage: enable should be a list, not a'
+ ' dict; skipping enabling services\n'
+ 'DEBUG: Attaching to Ubuntu Advantage. ua attach SomeToken\n',
+ self.logs.getvalue())
@skipUnlessJsonSchema()
@@ -112,90 +139,50 @@ class TestSchema(CiTestCase, SchemaTestCaseMixin):
with_logs = True
schema = schema
- def test_schema_warns_on_ubuntu_advantage_not_as_dict(self):
- """If ubuntu-advantage configuration is not a dict, emit a warning."""
- validate_cloudconfig_schema({'ubuntu-advantage': 'wrong type'}, schema)
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_schema_warns_on_ubuntu_advantage_not_dict(self, _cfg, _):
+ """If ubuntu_advantage configuration is not a dict, emit a warning."""
+ validate_cloudconfig_schema({'ubuntu_advantage': 'wrong type'}, schema)
self.assertEqual(
- "WARNING: Invalid config:\nubuntu-advantage: 'wrong type' is not"
+ "WARNING: Invalid config:\nubuntu_advantage: 'wrong type' is not"
" of type 'object'\n",
self.logs.getvalue())
- @mock.patch('%s.run_commands' % MPATH)
- def test_schema_disallows_unknown_keys(self, _):
- """Unknown keys in ubuntu-advantage configuration emit warnings."""
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_schema_disallows_unknown_keys(self, _cfg, _):
+ """Unknown keys in ubuntu_advantage configuration emit warnings."""
validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': ['ls'], 'invalid-key': ''}},
+ {'ubuntu_advantage': {'token': 'winner', 'invalid-key': ''}},
schema)
self.assertIn(
- 'WARNING: Invalid config:\nubuntu-advantage: Additional properties'
+ 'WARNING: Invalid config:\nubuntu_advantage: Additional properties'
" are not allowed ('invalid-key' was unexpected)",
self.logs.getvalue())
- def test_warn_schema_requires_commands(self):
- """Warn when ubuntu-advantage configuration lacks commands."""
- validate_cloudconfig_schema(
- {'ubuntu-advantage': {}}, schema)
- self.assertEqual(
- "WARNING: Invalid config:\nubuntu-advantage: 'commands' is a"
- " required property\n",
- self.logs.getvalue())
-
- @mock.patch('%s.run_commands' % MPATH)
- def test_warn_schema_commands_is_not_list_or_dict(self, _):
- """Warn when ubuntu-advantage:commands config is not a list or dict."""
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_warn_schema_requires_token(self, _cfg, _):
+ """Warn if ubuntu_advantage configuration lacks token."""
validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': 'broken'}}, schema)
+ {'ubuntu_advantage': {'enable': ['esm']}}, schema)
self.assertEqual(
- "WARNING: Invalid config:\nubuntu-advantage.commands: 'broken' is"
- " not of type 'object', 'array'\n",
- self.logs.getvalue())
+ "WARNING: Invalid config:\nubuntu_advantage:"
+ " 'token' is a required property\n", self.logs.getvalue())
- @mock.patch('%s.run_commands' % MPATH)
- def test_warn_schema_when_commands_is_empty(self, _):
- """Emit warnings when ubuntu-advantage:commands is empty."""
- validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': []}}, schema)
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_warn_schema_services_is_not_list_or_dict(self, _cfg, _):
+ """Warn when ubuntu_advantage:enable config is not a list."""
validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': {}}}, schema)
+ {'ubuntu_advantage': {'enable': 'needslist'}}, schema)
self.assertEqual(
- "WARNING: Invalid config:\nubuntu-advantage.commands: [] is too"
- " short\nWARNING: Invalid config:\nubuntu-advantage.commands: {}"
- " does not have enough properties\n",
+ "WARNING: Invalid config:\nubuntu_advantage: 'token' is a"
+ " required property\nubuntu_advantage.enable: 'needslist'"
+ " is not of type 'array'\n",
self.logs.getvalue())
- @mock.patch('%s.run_commands' % MPATH)
- def test_schema_when_commands_are_list_or_dict(self, _):
- """No warnings when ubuntu-advantage:commands are a list or dict."""
- validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': ['valid']}}, schema)
- validate_cloudconfig_schema(
- {'ubuntu-advantage': {'commands': {'01': 'also valid'}}}, schema)
- self.assertEqual('', self.logs.getvalue())
-
- def test_duplicates_are_fine_array_array(self):
- """Duplicated commands array/array entries are allowed."""
- self.assertSchemaValid(
- {'commands': [["echo", "bye"], ["echo" "bye"]]},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_array_string(self):
- """Duplicated commands array/string entries are allowed."""
- self.assertSchemaValid(
- {'commands': ["echo bye", "echo bye"]},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_dict_array(self):
- """Duplicated commands dict/array entries are allowed."""
- self.assertSchemaValid(
- {'commands': {'00': ["echo", "bye"], '01': ["echo", "bye"]}},
- "command entries can be duplicate.")
-
- def test_duplicates_are_fine_dict_string(self):
- """Duplicated commands dict/string entries are allowed."""
- self.assertSchemaValid(
- {'commands': {'00': "echo bye", '01': "echo bye"}},
- "command entries can be duplicate.")
-
class TestHandle(CiTestCase):
@@ -205,41 +192,89 @@ class TestHandle(CiTestCase):
super(TestHandle, self).setUp()
self.tmp = self.tmp_dir()
- @mock.patch('%s.run_commands' % MPATH)
@mock.patch('%s.validate_cloudconfig_schema' % MPATH)
- def test_handle_no_config(self, m_schema, m_run):
+ def test_handle_no_config(self, m_schema):
"""When no ua-related configuration is provided, nothing happens."""
cfg = {}
handle('ua-test', cfg=cfg, cloud=None, log=self.logger, args=None)
self.assertIn(
- "DEBUG: Skipping module named ua-test, no 'ubuntu-advantage' key"
- " in config",
+ "DEBUG: Skipping module named ua-test, no 'ubuntu_advantage'"
+ ' configuration found',
self.logs.getvalue())
m_schema.assert_not_called()
- m_run.assert_not_called()
+ @mock.patch('%s.configure_ua' % MPATH)
@mock.patch('%s.maybe_install_ua_tools' % MPATH)
- def test_handle_tries_to_install_ubuntu_advantage_tools(self, m_install):
+ def test_handle_tries_to_install_ubuntu_advantage_tools(
+ self, m_install, m_cfg):
"""If ubuntu_advantage is provided, try installing ua-tools package."""
- cfg = {'ubuntu-advantage': {}}
+ cfg = {'ubuntu_advantage': {'token': 'valid'}}
mycloud = FakeCloud(None)
handle('nomatter', cfg=cfg, cloud=mycloud, log=self.logger, args=None)
m_install.assert_called_once_with(mycloud)
+ @mock.patch('%s.configure_ua' % MPATH)
@mock.patch('%s.maybe_install_ua_tools' % MPATH)
- def test_handle_runs_commands_provided(self, m_install):
- """When commands are specified as a list, run them."""
- outfile = self.tmp_path('output.log', dir=self.tmp)
+ def test_handle_passes_credentials_and_services_to_configure_ua(
+ self, m_install, m_configure_ua):
+ """All ubuntu_advantage config keys are passed to configure_ua."""
+ cfg = {'ubuntu_advantage': {'token': 'token', 'enable': ['esm']}}
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ m_configure_ua.assert_called_once_with(
+ token='token', enable=['esm'])
+
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_handle_warns_on_deprecated_ubuntu_advantage_key_w_config(
+ self, m_configure_ua):
+ """Warning when ubuntu-advantage key is present with new config"""
+ cfg = {'ubuntu-advantage': {'token': 'token', 'enable': ['esm']}}
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertEqual(
+ 'WARNING: Deprecated configuration key "ubuntu-advantage"'
+ ' provided. Expected underscore delimited "ubuntu_advantage";'
+ ' will attempt to continue.',
+ self.logs.getvalue().splitlines()[0])
+ m_configure_ua.assert_called_once_with(
+ token='token', enable=['esm'])
+
+ def test_handle_error_on_deprecated_commands_key_dashed(self):
+ """Error when commands is present in ubuntu-advantage key."""
+ cfg = {'ubuntu-advantage': {'commands': 'nogo'}}
+ with self.assertRaises(RuntimeError) as context_manager:
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertEqual(
+ 'Deprecated configuration "ubuntu-advantage: commands" provided.'
+ ' Expected "token"',
+ str(context_manager.exception))
+
+ def test_handle_error_on_deprecated_commands_key_underscored(self):
+ """Error when commands is present in ubuntu_advantage key."""
+ cfg = {'ubuntu_advantage': {'commands': 'nogo'}}
+ with self.assertRaises(RuntimeError) as context_manager:
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertEqual(
+ 'Deprecated configuration "ubuntu-advantage: commands" provided.'
+ ' Expected "token"',
+ str(context_manager.exception))
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH, mock.MagicMock())
+ @mock.patch('%s.configure_ua' % MPATH)
+ def test_handle_prefers_new_style_config(
+ self, m_configure_ua):
+ """ubuntu_advantage should be preferred over ubuntu-advantage"""
cfg = {
- 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile,
- 'echo "MOM" >> %s' % outfile]}}
- mock_path = '%s.sys.stderr' % MPATH
- with self.allow_subp([CiTestCase.SUBP_SHELL_TRUE]):
- with mock.patch(mock_path, new_callable=StringIO):
- handle('nomatter', cfg=cfg, cloud=None, log=self.logger,
- args=None)
- self.assertEqual('HI\nMOM\n', util.load_file(outfile))
+ 'ubuntu-advantage': {'token': 'nope', 'enable': ['wrong']},
+ 'ubuntu_advantage': {'token': 'token', 'enable': ['esm']},
+ }
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertEqual(
+ 'WARNING: Deprecated configuration key "ubuntu-advantage"'
+ ' provided. Expected underscore delimited "ubuntu_advantage";'
+ ' will attempt to continue.',
+ self.logs.getvalue().splitlines()[0])
+ m_configure_ua.assert_called_once_with(
+ token='token', enable=['esm'])
class TestMaybeInstallUATools(CiTestCase):
@@ -253,7 +288,7 @@ class TestMaybeInstallUATools(CiTestCase):
@mock.patch('%s.util.which' % MPATH)
def test_maybe_install_ua_tools_noop_when_ua_tools_present(self, m_which):
"""Do nothing if ubuntu-advantage-tools already exists."""
- m_which.return_value = '/usr/bin/ubuntu-advantage' # already installed
+ m_which.return_value = '/usr/bin/ua' # already installed
distro = mock.MagicMock()
distro.update_package_sources.side_effect = RuntimeError(
'Some apt error')
diff --git a/cloudinit/config/tests/test_ubuntu_drivers.py b/cloudinit/config/tests/test_ubuntu_drivers.py
new file mode 100644
index 00000000..46952692
--- /dev/null
+++ b/cloudinit/config/tests/test_ubuntu_drivers.py
@@ -0,0 +1,237 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import os
+
+from cloudinit.tests.helpers import CiTestCase, skipUnlessJsonSchema, mock
+from cloudinit.config.schema import (
+ SchemaValidationError, validate_cloudconfig_schema)
+from cloudinit.config import cc_ubuntu_drivers as drivers
+from cloudinit.util import ProcessExecutionError
+
+MPATH = "cloudinit.config.cc_ubuntu_drivers."
+M_TMP_PATH = MPATH + "temp_utils.mkdtemp"
+OLD_UBUNTU_DRIVERS_ERROR_STDERR = (
+ "ubuntu-drivers: error: argument <command>: invalid choice: 'install' "
+ "(choose from 'list', 'autoinstall', 'devices', 'debug')\n")
+
+
+class AnyTempScriptAndDebconfFile(object):
+
+ def __init__(self, tmp_dir, debconf_file):
+ self.tmp_dir = tmp_dir
+ self.debconf_file = debconf_file
+
+ def __eq__(self, cmd):
+ if not len(cmd) == 2:
+ return False
+ script, debconf_file = cmd
+ if bool(script.startswith(self.tmp_dir) and script.endswith('.sh')):
+ return debconf_file == self.debconf_file
+ return False
+
+
+class TestUbuntuDrivers(CiTestCase):
+ cfg_accepted = {'drivers': {'nvidia': {'license-accepted': True}}}
+ install_gpgpu = ['ubuntu-drivers', 'install', '--gpgpu', 'nvidia']
+
+ with_logs = True
+
+ @skipUnlessJsonSchema()
+ def test_schema_requires_boolean_for_license_accepted(self):
+ with self.assertRaisesRegex(
+ SchemaValidationError, ".*license-accepted.*TRUE.*boolean"):
+ validate_cloudconfig_schema(
+ {'drivers': {'nvidia': {'license-accepted': "TRUE"}}},
+ schema=drivers.schema, strict=True)
+
+ @mock.patch(M_TMP_PATH)
+ @mock.patch(MPATH + "util.subp", return_value=('', ''))
+ @mock.patch(MPATH + "util.which", return_value=False)
+ def _assert_happy_path_taken(
+ self, config, m_which, m_subp, m_tmp):
+ """Positive path test through handle. Package should be installed."""
+ tdir = self.tmp_dir()
+ debconf_file = os.path.join(tdir, 'nvidia.template')
+ m_tmp.return_value = tdir
+ myCloud = mock.MagicMock()
+ drivers.handle('ubuntu_drivers', config, myCloud, None, None)
+ self.assertEqual([mock.call(['ubuntu-drivers-common'])],
+ myCloud.distro.install_packages.call_args_list)
+ self.assertEqual(
+ [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
+ mock.call(self.install_gpgpu)],
+ m_subp.call_args_list)
+
+ def test_handle_does_package_install(self):
+ self._assert_happy_path_taken(self.cfg_accepted)
+
+ def test_trueish_strings_are_considered_approval(self):
+ for true_value in ['yes', 'true', 'on', '1']:
+ new_config = copy.deepcopy(self.cfg_accepted)
+ new_config['drivers']['nvidia']['license-accepted'] = true_value
+ self._assert_happy_path_taken(new_config)
+
+ @mock.patch(M_TMP_PATH)
+ @mock.patch(MPATH + "util.subp")
+ @mock.patch(MPATH + "util.which", return_value=False)
+ def test_handle_raises_error_if_no_drivers_found(
+ self, m_which, m_subp, m_tmp):
+ """If ubuntu-drivers doesn't install any drivers, raise an error."""
+ tdir = self.tmp_dir()
+ debconf_file = os.path.join(tdir, 'nvidia.template')
+ m_tmp.return_value = tdir
+ myCloud = mock.MagicMock()
+
+ def fake_subp(cmd):
+ if cmd[0].startswith(tdir):
+ return
+ raise ProcessExecutionError(
+ stdout='No drivers found for installation.\n', exit_code=1)
+ m_subp.side_effect = fake_subp
+
+ with self.assertRaises(Exception):
+ drivers.handle(
+ 'ubuntu_drivers', self.cfg_accepted, myCloud, None, None)
+ self.assertEqual([mock.call(['ubuntu-drivers-common'])],
+ myCloud.distro.install_packages.call_args_list)
+ self.assertEqual(
+ [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
+ mock.call(self.install_gpgpu)],
+ m_subp.call_args_list)
+ self.assertIn('ubuntu-drivers found no drivers for installation',
+ self.logs.getvalue())
+
+ @mock.patch(MPATH + "util.subp", return_value=('', ''))
+ @mock.patch(MPATH + "util.which", return_value=False)
+ def _assert_inert_with_config(self, config, m_which, m_subp):
+ """Helper to reduce repetition when testing negative cases"""
+ myCloud = mock.MagicMock()
+ drivers.handle('ubuntu_drivers', config, myCloud, None, None)
+ self.assertEqual(0, myCloud.distro.install_packages.call_count)
+ self.assertEqual(0, m_subp.call_count)
+
+ def test_handle_inert_if_license_not_accepted(self):
+ """Ensure we don't do anything if the license is rejected."""
+ self._assert_inert_with_config(
+ {'drivers': {'nvidia': {'license-accepted': False}}})
+
+ def test_handle_inert_if_garbage_in_license_field(self):
+ """Ensure we don't do anything if unknown text is in license field."""
+ self._assert_inert_with_config(
+ {'drivers': {'nvidia': {'license-accepted': 'garbage'}}})
+
+ def test_handle_inert_if_no_license_key(self):
+ """Ensure we don't do anything if no license key."""
+ self._assert_inert_with_config({'drivers': {'nvidia': {}}})
+
+ def test_handle_inert_if_no_nvidia_key(self):
+ """Ensure we don't do anything if other license accepted."""
+ self._assert_inert_with_config(
+ {'drivers': {'acme': {'license-accepted': True}}})
+
+ def test_handle_inert_if_string_given(self):
+ """Ensure we don't do anything if string refusal given."""
+ for false_value in ['no', 'false', 'off', '0']:
+ self._assert_inert_with_config(
+ {'drivers': {'nvidia': {'license-accepted': false_value}}})
+
+ @mock.patch(MPATH + "install_drivers")
+ def test_handle_no_drivers_does_nothing(self, m_install_drivers):
+ """If no 'drivers' key in the config, nothing should be done."""
+ myCloud = mock.MagicMock()
+ myLog = mock.MagicMock()
+ drivers.handle('ubuntu_drivers', {'foo': 'bzr'}, myCloud, myLog, None)
+ self.assertIn('Skipping module named',
+ myLog.debug.call_args_list[0][0][0])
+ self.assertEqual(0, m_install_drivers.call_count)
+
+ @mock.patch(M_TMP_PATH)
+ @mock.patch(MPATH + "util.subp", return_value=('', ''))
+ @mock.patch(MPATH + "util.which", return_value=True)
+ def test_install_drivers_no_install_if_present(
+ self, m_which, m_subp, m_tmp):
+ """If 'ubuntu-drivers' is present, no package install should occur."""
+ tdir = self.tmp_dir()
+ debconf_file = os.path.join(tdir, 'nvidia.template')
+ m_tmp.return_value = tdir
+ pkg_install = mock.MagicMock()
+ drivers.install_drivers(self.cfg_accepted['drivers'],
+ pkg_install_func=pkg_install)
+ self.assertEqual(0, pkg_install.call_count)
+ self.assertEqual([mock.call('ubuntu-drivers')],
+ m_which.call_args_list)
+ self.assertEqual(
+ [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
+ mock.call(self.install_gpgpu)],
+ m_subp.call_args_list)
+
+ def test_install_drivers_rejects_invalid_config(self):
+ """install_drivers should raise TypeError if not given a config dict"""
+ pkg_install = mock.MagicMock()
+ with self.assertRaisesRegex(TypeError, ".*expected dict.*"):
+ drivers.install_drivers("mystring", pkg_install_func=pkg_install)
+ self.assertEqual(0, pkg_install.call_count)
+
+ @mock.patch(M_TMP_PATH)
+ @mock.patch(MPATH + "util.subp")
+ @mock.patch(MPATH + "util.which", return_value=False)
+ def test_install_drivers_handles_old_ubuntu_drivers_gracefully(
+ self, m_which, m_subp, m_tmp):
+ """Older ubuntu-drivers versions should emit message and raise error"""
+ tdir = self.tmp_dir()
+ debconf_file = os.path.join(tdir, 'nvidia.template')
+ m_tmp.return_value = tdir
+ myCloud = mock.MagicMock()
+
+ def fake_subp(cmd):
+ if cmd[0].startswith(tdir):
+ return
+ raise ProcessExecutionError(
+ stderr=OLD_UBUNTU_DRIVERS_ERROR_STDERR, exit_code=2)
+ m_subp.side_effect = fake_subp
+
+ with self.assertRaises(Exception):
+ drivers.handle(
+ 'ubuntu_drivers', self.cfg_accepted, myCloud, None, None)
+ self.assertEqual([mock.call(['ubuntu-drivers-common'])],
+ myCloud.distro.install_packages.call_args_list)
+ self.assertEqual(
+ [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
+ mock.call(self.install_gpgpu)],
+ m_subp.call_args_list)
+ self.assertIn('WARNING: the available version of ubuntu-drivers is'
+ ' too old to perform requested driver installation',
+ self.logs.getvalue())
+
+
+# Sub-class TestUbuntuDrivers to run the same test cases, but with a version
+class TestUbuntuDriversWithVersion(TestUbuntuDrivers):
+ cfg_accepted = {
+ 'drivers': {'nvidia': {'license-accepted': True, 'version': '123'}}}
+ install_gpgpu = ['ubuntu-drivers', 'install', '--gpgpu', 'nvidia:123']
+
+ @mock.patch(M_TMP_PATH)
+ @mock.patch(MPATH + "util.subp", return_value=('', ''))
+ @mock.patch(MPATH + "util.which", return_value=False)
+ def test_version_none_uses_latest(self, m_which, m_subp, m_tmp):
+ tdir = self.tmp_dir()
+ debconf_file = os.path.join(tdir, 'nvidia.template')
+ m_tmp.return_value = tdir
+ myCloud = mock.MagicMock()
+ version_none_cfg = {
+ 'drivers': {'nvidia': {'license-accepted': True, 'version': None}}}
+ drivers.handle(
+ 'ubuntu_drivers', version_none_cfg, myCloud, None, None)
+ self.assertEqual(
+ [mock.call(AnyTempScriptAndDebconfFile(tdir, debconf_file)),
+ mock.call(['ubuntu-drivers', 'install', '--gpgpu', 'nvidia'])],
+ m_subp.call_args_list)
+
+ def test_specifying_a_version_doesnt_override_license_acceptance(self):
+ self._assert_inert_with_config({
+ 'drivers': {'nvidia': {'license-accepted': False,
+ 'version': '123'}}
+ })
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_users_groups.py b/cloudinit/config/tests/test_users_groups.py
index ba0afae3..f620b597 100644
--- a/cloudinit/config/tests/test_users_groups.py
+++ b/cloudinit/config/tests/test_users_groups.py
@@ -46,6 +46,34 @@ class TestHandleUsersGroups(CiTestCase):
mock.call('me2', default=False)])
m_group.assert_not_called()
+ @mock.patch('cloudinit.distros.freebsd.Distro.create_group')
+ @mock.patch('cloudinit.distros.freebsd.Distro.create_user')
+ def test_handle_users_in_cfg_calls_create_users_on_bsd(
+ self,
+ m_fbsd_user,
+ m_fbsd_group,
+ m_linux_user,
+ m_linux_group,
+ ):
+ """When users in config, create users with freebsd.create_user."""
+ cfg = {'users': ['default', {'name': 'me2'}]} # merged cloud-config
+ # System config defines a default user for the distro.
+ sys_cfg = {'default_user': {'name': 'freebsd', 'lock_passwd': True,
+ 'groups': ['wheel'],
+ 'shell': '/bin/tcsh'}}
+ metadata = {}
+ cloud = self.tmp_cloud(
+ distro='freebsd', sys_cfg=sys_cfg, metadata=metadata)
+ cc_users_groups.handle('modulename', cfg, cloud, None, None)
+ self.assertItemsEqual(
+ m_fbsd_user.call_args_list,
+ [mock.call('freebsd', groups='wheel', lock_passwd=True,
+ shell='/bin/tcsh'),
+ mock.call('me2', default=False)])
+ m_fbsd_group.assert_not_called()
+ m_linux_group.assert_not_called()
+ m_linux_user.assert_not_called()
+
def test_users_with_ssh_redirect_user_passes_keys(self, m_user, m_group):
"""When ssh_redirect_user is True pass default user and cloud keys."""
cfg = {