diff options
Diffstat (limited to 'cloudinit')
-rw-r--r-- | cloudinit/config/cc_snap.py | 47 | ||||
-rw-r--r-- | cloudinit/config/cc_ubuntu_advantage.py | 173 | ||||
-rw-r--r-- | cloudinit/config/tests/test_snap.py | 52 | ||||
-rw-r--r-- | cloudinit/config/tests/test_ubuntu_advantage.py | 268 | ||||
-rw-r--r-- | cloudinit/subp.py | 57 | ||||
-rw-r--r-- | cloudinit/tests/test_subp.py | 61 |
6 files changed, 563 insertions, 95 deletions
diff --git a/cloudinit/config/cc_snap.py b/cloudinit/config/cc_snap.py index db965291..34a53fd4 100644 --- a/cloudinit/config/cc_snap.py +++ b/cloudinit/config/cc_snap.py @@ -11,6 +11,7 @@ from cloudinit import log as logging from cloudinit.config.schema import ( get_schema_doc, validate_cloudconfig_schema) from cloudinit.settings import PER_INSTANCE +from cloudinit.subp import prepend_base_command from cloudinit import util @@ -160,50 +161,6 @@ def add_assertions(assertions): util.subp(snap_cmd + [ASSERTIONS_FILE], capture=True) -def prepend_snap_commands(commands): - """Ensure user-provided commands start with SNAP_CMD, warn otherwise. - - Each command is either a list or string. Perform the following: - - When the command is a list, pop the first element if it is None - - When the command is a list, insert SNAP_CMD as the first element if - not present. - - When the command is a string containing a non-snap command, warn. - - Support cut-n-paste snap command sets from public snappy documentation. - Allow flexibility to provide non-snap environment/config setup if needed. - - @commands: List of commands. Each command element is a list or string. - - @return: List of 'fixed up' snap commands. - @raise: TypeError on invalid config item type. - """ - warnings = [] - errors = [] - fixed_commands = [] - for command in commands: - if isinstance(command, list): - if command[0] is None: # Avoid warnings by specifying None - command = command[1:] - elif command[0] != SNAP_CMD: # Automatically prepend SNAP_CMD - command.insert(0, SNAP_CMD) - elif isinstance(command, str): - if not command.startswith('%s ' % SNAP_CMD): - warnings.append(command) - else: - errors.append(str(command)) - continue - fixed_commands.append(command) - - if warnings: - LOG.warning( - 'Non-snap commands in snap config:\n%s', '\n'.join(warnings)) - if errors: - raise TypeError( - 'Invalid snap config.' - ' These commands are not a string or list:\n' + '\n'.join(errors)) - return fixed_commands - - def run_commands(commands): """Run the provided commands provided in snap:commands configuration. @@ -224,7 +181,7 @@ def run_commands(commands): 'commands parameter was not a list or dict: {commands}'.format( commands=commands)) - fixed_snap_commands = prepend_snap_commands(commands) + fixed_snap_commands = prepend_base_command('snap', commands) cmd_failures = [] for command in fixed_snap_commands: diff --git a/cloudinit/config/cc_ubuntu_advantage.py b/cloudinit/config/cc_ubuntu_advantage.py new file mode 100644 index 00000000..16b1868b --- /dev/null +++ b/cloudinit/config/cc_ubuntu_advantage.py @@ -0,0 +1,173 @@ +# Copyright (C) 2018 Canonical Ltd. +# +# This file is part of cloud-init. See LICENSE file for license information. + +"""Ubuntu advantage: manage ubuntu-advantage offerings from Canonical.""" + +import sys +from textwrap import dedent + +from cloudinit import log as logging +from cloudinit.config.schema import ( + get_schema_doc, validate_cloudconfig_schema) +from cloudinit.settings import PER_INSTANCE +from cloudinit.subp import prepend_base_command +from cloudinit import util + + +distros = ['ubuntu'] +frequency = PER_INSTANCE + +LOG = logging.getLogger(__name__) + +schema = { + 'id': 'cc_ubuntu_advantage', + 'name': 'Ubuntu Advantage', + 'title': 'Install, configure and manage ubuntu-advantage offerings', + 'description': dedent("""\ + This module provides configuration options to setup ubuntu-advantage + subscriptions. + + .. note:: + Both ``commands`` value can be either a dictionary or a list. If + the configuration provided is a dictionary, the keys are only used + to order the execution of the commands and the dictionary is + merged with any vendor-data ubuntu-advantage configuration + provided. If a ``commands`` is provided as a list, any vendor-data + ubuntu-advantage ``commands`` are ignored. + + Ubuntu-advantage ``commands`` is a dictionary or list of + ubuntu-advantage commands to run on the deployed machine. + These commands can be used to enable or disable subscriptions to + various ubuntu-advantage products. See 'man ubuntu-advantage' for more + information on supported subcommands. + + .. note:: + Each command item can be a string or list. If the item is a list, + 'ubuntu-advantage' can be omitted and it will automatically be + inserted as part of the command. + """), + 'distros': distros, + 'examples': [dedent("""\ + # Enable Extended Security Maintenance using your service auth token + ubuntu-advantage: + commands: + 00: ubuntu-advantage enable-esm <token> + """), dedent("""\ + # Enable livepatch by providing your livepatch token + ubuntu-advantage: + commands: + 00: ubuntu-advantage enable-livepatch <livepatch-token> + + """), dedent("""\ + # Convenience: the ubuntu-advantage command can be omitted when + # specifying commands as a list and 'ubuntu-advantage' will + # automatically be prepended. + # The following commands are equivalent + ubuntu-advantage: + commands: + 00: ['enable-livepatch', 'my-token'] + 01: ['ubuntu-advantage', 'enable-livepatch', 'my-token'] + 02: ubuntu-advantage enable-livepatch my-token + 03: 'ubuntu-advantage enable-livepatch my-token' + """)], + 'frequency': PER_INSTANCE, + 'type': 'object', + 'properties': { + 'ubuntu-advantage': { + 'type': 'object', + 'properties': { + 'commands': { + 'type': ['object', 'array'], # Array of strings or dict + 'items': { + 'oneOf': [ + {'type': 'array', 'items': {'type': 'string'}}, + {'type': 'string'}] + }, + 'additionalItems': False, # Reject non-string & non-list + 'minItems': 1, + 'minProperties': 1, + 'uniqueItems': True + } + }, + 'additionalProperties': False, # Reject keys not in schema + 'required': ['commands'] + } + } +} + +# TODO schema for 'assertions' and 'commands' are too permissive at the moment. +# Once python-jsonschema supports schema draft 6 add support for arbitrary +# object keys with 'patternProperties' constraint to validate string values. + +__doc__ = get_schema_doc(schema) # Supplement python help() + +UA_CMD = "ubuntu-advantage" + + +def run_commands(commands): + """Run the commands provided in ubuntu-advantage:commands config. + + Commands are run individually. Any errors are collected and reported + after attempting all commands. + + @param commands: A list or dict containing commands to run. Keys of a + dict will be used to order the commands provided as dict values. + """ + if not commands: + return + LOG.debug('Running user-provided ubuntu-advantage commands') + if isinstance(commands, dict): + # Sort commands based on dictionary key + commands = [v for _, v in sorted(commands.items())] + elif not isinstance(commands, list): + raise TypeError( + 'commands parameter was not a list or dict: {commands}'.format( + commands=commands)) + + fixed_ua_commands = prepend_base_command('ubuntu-advantage', commands) + + cmd_failures = [] + for command in fixed_ua_commands: + shell = isinstance(command, str) + try: + util.subp(command, shell=shell, status_cb=sys.stderr.write) + except util.ProcessExecutionError as e: + cmd_failures.append(str(e)) + if cmd_failures: + msg = ( + 'Failures running ubuntu-advantage commands:\n' + '{cmd_failures}'.format( + cmd_failures=cmd_failures)) + util.logexc(LOG, msg) + raise RuntimeError(msg) + + +def maybe_install_ua_tools(cloud): + """Install ubuntu-advantage-tools if not present.""" + if util.which('ubuntu-advantage'): + return + try: + cloud.distro.update_package_sources() + except Exception as e: + util.logexc(LOG, "Package update failed") + raise + try: + cloud.distro.install_packages(['ubuntu-advantage-tools']) + except Exception as e: + util.logexc(LOG, "Failed to install ubuntu-advantage-tools") + raise + + +def handle(name, cfg, cloud, log, args): + cfgin = cfg.get('ubuntu-advantage') + if cfgin is None: + LOG.debug(("Skipping module named %s," + " no 'ubuntu-advantage' key in configuration"), name) + return + + validate_cloudconfig_schema(cfg, schema) + maybe_install_ua_tools(cloud) + run_commands(cfgin.get('commands', [])) + +# vi: ts=4 expandtab diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py index cb1205e9..988e7f7c 100644 --- a/cloudinit/config/tests/test_snap.py +++ b/cloudinit/config/tests/test_snap.py @@ -4,8 +4,8 @@ import re from six import StringIO from cloudinit.config.cc_snap import ( - ASSERTIONS_FILE, add_assertions, handle, prepend_snap_commands, - maybe_install_squashfuse, run_commands, schema) + ASSERTIONS_FILE, add_assertions, handle, maybe_install_squashfuse, + run_commands, schema) from cloudinit.config.schema import validate_cloudconfig_schema from cloudinit import util from cloudinit.tests.helpers import CiTestCase, mock, wrap_and_call @@ -158,54 +158,6 @@ class TestAddAssertions(CiTestCase): util.load_file(compare_file), util.load_file(assert_file)) -class TestPrepentSnapCommands(CiTestCase): - - with_logs = True - - def test_prepend_snap_commands_errors_on_neither_string_nor_list(self): - """Raise an error for each command which is not a string or list.""" - orig_commands = ['ls', 1, {'not': 'gonna work'}, ['snap', 'list']] - with self.assertRaises(TypeError) as context_manager: - prepend_snap_commands(orig_commands) - self.assertEqual( - "Invalid snap config. These commands are not a string or list:\n" - "1\n{'not': 'gonna work'}", - str(context_manager.exception)) - - def test_prepend_snap_commands_warns_on_non_snap_string_commands(self): - """Warn on each non-snap for commands of type string.""" - orig_commands = ['ls', 'snap list', 'touch /blah', 'snap install x'] - fixed_commands = prepend_snap_commands(orig_commands) - self.assertEqual( - 'WARNING: Non-snap commands in snap config:\n' - 'ls\ntouch /blah\n', - self.logs.getvalue()) - self.assertEqual(orig_commands, fixed_commands) - - def test_prepend_snap_commands_prepends_on_non_snap_list_commands(self): - """Prepend 'snap' for each non-snap command of type list.""" - orig_commands = [['ls'], ['snap', 'list'], ['snapa', '/blah'], - ['snap', 'install', 'x']] - expected = [['snap', 'ls'], ['snap', 'list'], - ['snap', 'snapa', '/blah'], - ['snap', 'install', 'x']] - fixed_commands = prepend_snap_commands(orig_commands) - self.assertEqual('', self.logs.getvalue()) - self.assertEqual(expected, fixed_commands) - - def test_prepend_snap_commands_removes_first_item_when_none(self): - """Remove the first element of a non-snap command when it is None.""" - orig_commands = [[None, 'ls'], ['snap', 'list'], - [None, 'touch', '/blah'], - ['snap', 'install', 'x']] - expected = [['ls'], ['snap', 'list'], - ['touch', '/blah'], - ['snap', 'install', 'x']] - fixed_commands = prepend_snap_commands(orig_commands) - self.assertEqual('', self.logs.getvalue()) - self.assertEqual(expected, fixed_commands) - - class TestRunCommands(CiTestCase): with_logs = True diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py new file mode 100644 index 00000000..0eeadd43 --- /dev/null +++ b/cloudinit/config/tests/test_ubuntu_advantage.py @@ -0,0 +1,268 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import re +from six import StringIO + +from cloudinit.config.cc_ubuntu_advantage import ( + handle, maybe_install_ua_tools, run_commands, schema) +from cloudinit.config.schema import validate_cloudconfig_schema +from cloudinit import util +from cloudinit.tests.helpers import CiTestCase, mock + + +# Module path used in mocks +MPATH = 'cloudinit.config.cc_ubuntu_advantage' + + +class FakeCloud(object): + def __init__(self, distro): + self.distro = distro + + +class TestRunCommands(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestRunCommands, self).setUp() + self.tmp = self.tmp_dir() + + @mock.patch('%s.util.subp' % MPATH) + def test_run_commands_on_empty_list(self, m_subp): + """When provided with an empty list, run_commands does nothing.""" + run_commands([]) + self.assertEqual('', self.logs.getvalue()) + m_subp.assert_not_called() + + def test_run_commands_on_non_list_or_dict(self): + """When provided an invalid type, run_commands raises an error.""" + with self.assertRaises(TypeError) as context_manager: + run_commands(commands="I'm Not Valid") + self.assertEqual( + "commands parameter was not a list or dict: I'm Not Valid", + str(context_manager.exception)) + + def test_run_command_logs_commands_and_exit_codes_to_stderr(self): + """All exit codes are logged to stderr.""" + outfile = self.tmp_path('output.log', dir=self.tmp) + + cmd1 = 'echo "HI" >> %s' % outfile + cmd2 = 'bogus command' + cmd3 = 'echo "MOM" >> %s' % outfile + commands = [cmd1, cmd2, cmd3] + + mock_path = '%s.sys.stderr' % MPATH + with mock.patch(mock_path, new_callable=StringIO) as m_stderr: + with self.assertRaises(RuntimeError) as context_manager: + run_commands(commands=commands) + + self.assertIsNotNone( + re.search(r'bogus: (command )?not found', + str(context_manager.exception)), + msg='Expected bogus command not found') + expected_stderr_log = '\n'.join([ + 'Begin run command: {cmd}'.format(cmd=cmd1), + 'End run command: exit(0)', + 'Begin run command: {cmd}'.format(cmd=cmd2), + 'ERROR: End run command: exit(127)', + 'Begin run command: {cmd}'.format(cmd=cmd3), + 'End run command: exit(0)\n']) + self.assertEqual(expected_stderr_log, m_stderr.getvalue()) + + def test_run_command_as_lists(self): + """When commands are specified as a list, run them in order.""" + outfile = self.tmp_path('output.log', dir=self.tmp) + + cmd1 = 'echo "HI" >> %s' % outfile + cmd2 = 'echo "MOM" >> %s' % outfile + commands = [cmd1, cmd2] + with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO): + run_commands(commands=commands) + + self.assertIn( + 'DEBUG: Running user-provided ubuntu-advantage commands', + self.logs.getvalue()) + self.assertEqual('HI\nMOM\n', util.load_file(outfile)) + self.assertIn( + 'WARNING: Non-ubuntu-advantage commands in ubuntu-advantage' + ' config:', + self.logs.getvalue()) + + def test_run_command_dict_sorted_as_command_script(self): + """When commands are a dict, sort them and run.""" + outfile = self.tmp_path('output.log', dir=self.tmp) + cmd1 = 'echo "HI" >> %s' % outfile + cmd2 = 'echo "MOM" >> %s' % outfile + commands = {'02': cmd1, '01': cmd2} + with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO): + run_commands(commands=commands) + + expected_messages = [ + 'DEBUG: Running user-provided ubuntu-advantage commands'] + for message in expected_messages: + self.assertIn(message, self.logs.getvalue()) + self.assertEqual('MOM\nHI\n', util.load_file(outfile)) + + +class TestSchema(CiTestCase): + + with_logs = True + + def test_schema_warns_on_ubuntu_advantage_not_as_dict(self): + """If ubuntu-advantage configuration is not a dict, emit a warning.""" + validate_cloudconfig_schema({'ubuntu-advantage': 'wrong type'}, schema) + self.assertEqual( + "WARNING: Invalid config:\nubuntu-advantage: 'wrong type' is not" + " of type 'object'\n", + self.logs.getvalue()) + + @mock.patch('%s.run_commands' % MPATH) + def test_schema_disallows_unknown_keys(self, _): + """Unknown keys in ubuntu-advantage configuration emit warnings.""" + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': ['ls'], 'invalid-key': ''}}, + schema) + self.assertIn( + 'WARNING: Invalid config:\nubuntu-advantage: Additional properties' + " are not allowed ('invalid-key' was unexpected)", + self.logs.getvalue()) + + def test_warn_schema_requires_commands(self): + """Warn when ubuntu-advantage configuration lacks commands.""" + validate_cloudconfig_schema( + {'ubuntu-advantage': {}}, schema) + self.assertEqual( + "WARNING: Invalid config:\nubuntu-advantage: 'commands' is a" + " required property\n", + self.logs.getvalue()) + + @mock.patch('%s.run_commands' % MPATH) + def test_warn_schema_commands_is_not_list_or_dict(self, _): + """Warn when ubuntu-advantage:commands config is not a list or dict.""" + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': 'broken'}}, schema) + self.assertEqual( + "WARNING: Invalid config:\nubuntu-advantage.commands: 'broken' is" + " not of type 'object', 'array'\n", + self.logs.getvalue()) + + @mock.patch('%s.run_commands' % MPATH) + def test_warn_schema_when_commands_is_empty(self, _): + """Emit warnings when ubuntu-advantage:commands is empty.""" + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': []}}, schema) + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': {}}}, schema) + self.assertEqual( + "WARNING: Invalid config:\nubuntu-advantage.commands: [] is too" + " short\nWARNING: Invalid config:\nubuntu-advantage.commands: {}" + " does not have enough properties\n", + self.logs.getvalue()) + + @mock.patch('%s.run_commands' % MPATH) + def test_schema_when_commands_are_list_or_dict(self, _): + """No warnings when ubuntu-advantage:commands are a list or dict.""" + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': ['valid']}}, schema) + validate_cloudconfig_schema( + {'ubuntu-advantage': {'commands': {'01': 'also valid'}}}, schema) + self.assertEqual('', self.logs.getvalue()) + + +class TestHandle(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestHandle, self).setUp() + self.tmp = self.tmp_dir() + + @mock.patch('%s.run_commands' % MPATH) + @mock.patch('%s.validate_cloudconfig_schema' % MPATH) + def test_handle_no_config(self, m_schema, m_run): + """When no ua-related configuration is provided, nothing happens.""" + cfg = {} + handle('ua-test', cfg=cfg, cloud=None, log=self.logger, args=None) + self.assertIn( + "DEBUG: Skipping module named ua-test, no 'ubuntu-advantage' key" + " in config", + self.logs.getvalue()) + m_schema.assert_not_called() + m_run.assert_not_called() + + @mock.patch('%s.maybe_install_ua_tools' % MPATH) + def test_handle_tries_to_install_ubuntu_advantage_tools(self, m_install): + """If ubuntu_advantage is provided, try installing ua-tools package.""" + cfg = {'ubuntu-advantage': {}} + mycloud = FakeCloud(None) + handle('nomatter', cfg=cfg, cloud=mycloud, log=self.logger, args=None) + m_install.assert_called_once_with(mycloud) + + @mock.patch('%s.maybe_install_ua_tools' % MPATH) + def test_handle_runs_commands_provided(self, m_install): + """When commands are specified as a list, run them.""" + outfile = self.tmp_path('output.log', dir=self.tmp) + + cfg = { + 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile, + 'echo "MOM" >> %s' % outfile]}} + mock_path = '%s.sys.stderr' % MPATH + with mock.patch(mock_path, new_callable=StringIO): + handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None) + self.assertEqual('HI\nMOM\n', util.load_file(outfile)) + + +class TestMaybeInstallUATools(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestMaybeInstallUATools, self).setUp() + self.tmp = self.tmp_dir() + + @mock.patch('%s.util.which' % MPATH) + def test_maybe_install_ua_tools_noop_when_ua_tools_present(self, m_which): + """Do nothing if ubuntu-advantage-tools already exists.""" + m_which.return_value = '/usr/bin/ubuntu-advantage' # already installed + distro = mock.MagicMock() + distro.update_package_sources.side_effect = RuntimeError( + 'Some apt error') + maybe_install_ua_tools(cloud=FakeCloud(distro)) # No RuntimeError + + @mock.patch('%s.util.which' % MPATH) + def test_maybe_install_ua_tools_raises_update_errors(self, m_which): + """maybe_install_ua_tools logs and raises apt update errors.""" + m_which.return_value = None + distro = mock.MagicMock() + distro.update_package_sources.side_effect = RuntimeError( + 'Some apt error') + with self.assertRaises(RuntimeError) as context_manager: + maybe_install_ua_tools(cloud=FakeCloud(distro)) + self.assertEqual('Some apt error', str(context_manager.exception)) + self.assertIn('Package update failed\nTraceback', self.logs.getvalue()) + + @mock.patch('%s.util.which' % MPATH) + def test_maybe_install_ua_raises_install_errors(self, m_which): + """maybe_install_ua_tools logs and raises package install errors.""" + m_which.return_value = None + distro = mock.MagicMock() + distro.update_package_sources.return_value = None + distro.install_packages.side_effect = RuntimeError( + 'Some install error') + with self.assertRaises(RuntimeError) as context_manager: + maybe_install_ua_tools(cloud=FakeCloud(distro)) + self.assertEqual('Some install error', str(context_manager.exception)) + self.assertIn( + 'Failed to install ubuntu-advantage-tools\n', self.logs.getvalue()) + + @mock.patch('%s.util.which' % MPATH) + def test_maybe_install_ua_tools_happy_path(self, m_which): + """maybe_install_ua_tools installs ubuntu-advantage-tools.""" + m_which.return_value = None + distro = mock.MagicMock() # No errors raised + maybe_install_ua_tools(cloud=FakeCloud(distro)) + distro.update_package_sources.assert_called_once_with() + distro.install_packages.assert_called_once_with( + ['ubuntu-advantage-tools']) + +# vi: ts=4 expandtab diff --git a/cloudinit/subp.py b/cloudinit/subp.py new file mode 100644 index 00000000..0ad09306 --- /dev/null +++ b/cloudinit/subp.py @@ -0,0 +1,57 @@ +# This file is part of cloud-init. See LICENSE file for license information. +"""Common utility functions for interacting with subprocess.""" + +# TODO move subp shellify and runparts related functions out of util.py + +import logging + +LOG = logging.getLogger(__name__) + + +def prepend_base_command(base_command, commands): + """Ensure user-provided commands start with base_command; warn otherwise. + + Each command is either a list or string. Perform the following: + - If the command is a list, pop the first element if it is None + - If the command is a list, insert base_command as the first element if + not present. + - When the command is a string not starting with 'base-command', warn. + + Allow flexibility to provide non-base-command environment/config setup if + needed. + + @commands: List of commands. Each command element is a list or string. + + @return: List of 'fixed up' commands. + @raise: TypeError on invalid config item type. + """ + warnings = [] + errors = [] + fixed_commands = [] + for command in commands: + if isinstance(command, list): + if command[0] is None: # Avoid warnings by specifying None + command = command[1:] + elif command[0] != base_command: # Automatically prepend + command.insert(0, base_command) + elif isinstance(command, str): + if not command.startswith('%s ' % base_command): + warnings.append(command) + else: + errors.append(str(command)) + continue + fixed_commands.append(command) + + if warnings: + LOG.warning( + 'Non-%s commands in %s config:\n%s', + base_command, base_command, '\n'.join(warnings)) + if errors: + raise TypeError( + 'Invalid {name} config.' + ' These commands are not a string or list:\n{errors}'.format( + name=base_command, errors='\n'.join(errors))) + return fixed_commands + + +# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py new file mode 100644 index 00000000..448097d3 --- /dev/null +++ b/cloudinit/tests/test_subp.py @@ -0,0 +1,61 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloudinit.subp utility functions""" + +from cloudinit import subp +from cloudinit.tests.helpers import CiTestCase + + +class TestPrependBaseCommands(CiTestCase): + + with_logs = True + + def test_prepend_base_command_errors_on_neither_string_nor_list(self): + """Raise an error for each command which is not a string or list.""" + orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']] + with self.assertRaises(TypeError) as context_manager: + subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual( + "Invalid basecmd config. These commands are not a string or" + " list:\n1\n{'not': 'gonna work'}", + str(context_manager.exception)) + + def test_prepend_base_command_warns_on_non_base_string_commands(self): + """Warn on each non-base for commands of type string.""" + orig_commands = [ + 'ls', 'basecmd list', 'touch /blah', 'basecmd install x'] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual( + 'WARNING: Non-basecmd commands in basecmd config:\n' + 'ls\ntouch /blah\n', + self.logs.getvalue()) + self.assertEqual(orig_commands, fixed_commands) + + def test_prepend_base_command_prepends_on_non_base_list_commands(self): + """Prepend 'basecmd' for each non-basecmd command of type list.""" + orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'], + ['basecmd', 'install', 'x']] + expected = [['basecmd', 'ls'], ['basecmd', 'list'], + ['basecmd', 'basecmda', '/blah'], + ['basecmd', 'install', 'x']] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual('', self.logs.getvalue()) + self.assertEqual(expected, fixed_commands) + + def test_prepend_base_command_removes_first_item_when_none(self): + """Remove the first element of a non-basecmd when it is None.""" + orig_commands = [[None, 'ls'], ['basecmd', 'list'], + [None, 'touch', '/blah'], + ['basecmd', 'install', 'x']] + expected = [['ls'], ['basecmd', 'list'], + ['touch', '/blah'], + ['basecmd', 'install', 'x']] + fixed_commands = subp.prepend_base_command( + base_command='basecmd', commands=orig_commands) + self.assertEqual('', self.logs.getvalue()) + self.assertEqual(expected, fixed_commands) + +# vi: ts=4 expandtab |