summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/config/cc_snap.py47
-rw-r--r--cloudinit/config/cc_ubuntu_advantage.py173
-rw-r--r--cloudinit/config/tests/test_snap.py52
-rw-r--r--cloudinit/config/tests/test_ubuntu_advantage.py268
-rw-r--r--cloudinit/subp.py57
-rw-r--r--cloudinit/tests/test_subp.py61
6 files changed, 563 insertions, 95 deletions
diff --git a/cloudinit/config/cc_snap.py b/cloudinit/config/cc_snap.py
index db965291..34a53fd4 100644
--- a/cloudinit/config/cc_snap.py
+++ b/cloudinit/config/cc_snap.py
@@ -11,6 +11,7 @@ from cloudinit import log as logging
from cloudinit.config.schema import (
get_schema_doc, validate_cloudconfig_schema)
from cloudinit.settings import PER_INSTANCE
+from cloudinit.subp import prepend_base_command
from cloudinit import util
@@ -160,50 +161,6 @@ def add_assertions(assertions):
util.subp(snap_cmd + [ASSERTIONS_FILE], capture=True)
-def prepend_snap_commands(commands):
- """Ensure user-provided commands start with SNAP_CMD, warn otherwise.
-
- Each command is either a list or string. Perform the following:
- - When the command is a list, pop the first element if it is None
- - When the command is a list, insert SNAP_CMD as the first element if
- not present.
- - When the command is a string containing a non-snap command, warn.
-
- Support cut-n-paste snap command sets from public snappy documentation.
- Allow flexibility to provide non-snap environment/config setup if needed.
-
- @commands: List of commands. Each command element is a list or string.
-
- @return: List of 'fixed up' snap commands.
- @raise: TypeError on invalid config item type.
- """
- warnings = []
- errors = []
- fixed_commands = []
- for command in commands:
- if isinstance(command, list):
- if command[0] is None: # Avoid warnings by specifying None
- command = command[1:]
- elif command[0] != SNAP_CMD: # Automatically prepend SNAP_CMD
- command.insert(0, SNAP_CMD)
- elif isinstance(command, str):
- if not command.startswith('%s ' % SNAP_CMD):
- warnings.append(command)
- else:
- errors.append(str(command))
- continue
- fixed_commands.append(command)
-
- if warnings:
- LOG.warning(
- 'Non-snap commands in snap config:\n%s', '\n'.join(warnings))
- if errors:
- raise TypeError(
- 'Invalid snap config.'
- ' These commands are not a string or list:\n' + '\n'.join(errors))
- return fixed_commands
-
-
def run_commands(commands):
"""Run the provided commands provided in snap:commands configuration.
@@ -224,7 +181,7 @@ def run_commands(commands):
'commands parameter was not a list or dict: {commands}'.format(
commands=commands))
- fixed_snap_commands = prepend_snap_commands(commands)
+ fixed_snap_commands = prepend_base_command('snap', commands)
cmd_failures = []
for command in fixed_snap_commands:
diff --git a/cloudinit/config/cc_ubuntu_advantage.py b/cloudinit/config/cc_ubuntu_advantage.py
new file mode 100644
index 00000000..16b1868b
--- /dev/null
+++ b/cloudinit/config/cc_ubuntu_advantage.py
@@ -0,0 +1,173 @@
+# Copyright (C) 2018 Canonical Ltd.
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Ubuntu advantage: manage ubuntu-advantage offerings from Canonical."""
+
+import sys
+from textwrap import dedent
+
+from cloudinit import log as logging
+from cloudinit.config.schema import (
+ get_schema_doc, validate_cloudconfig_schema)
+from cloudinit.settings import PER_INSTANCE
+from cloudinit.subp import prepend_base_command
+from cloudinit import util
+
+
+distros = ['ubuntu']
+frequency = PER_INSTANCE
+
+LOG = logging.getLogger(__name__)
+
+schema = {
+ 'id': 'cc_ubuntu_advantage',
+ 'name': 'Ubuntu Advantage',
+ 'title': 'Install, configure and manage ubuntu-advantage offerings',
+ 'description': dedent("""\
+ This module provides configuration options to setup ubuntu-advantage
+ subscriptions.
+
+ .. note::
+ Both ``commands`` value can be either a dictionary or a list. If
+ the configuration provided is a dictionary, the keys are only used
+ to order the execution of the commands and the dictionary is
+ merged with any vendor-data ubuntu-advantage configuration
+ provided. If a ``commands`` is provided as a list, any vendor-data
+ ubuntu-advantage ``commands`` are ignored.
+
+ Ubuntu-advantage ``commands`` is a dictionary or list of
+ ubuntu-advantage commands to run on the deployed machine.
+ These commands can be used to enable or disable subscriptions to
+ various ubuntu-advantage products. See 'man ubuntu-advantage' for more
+ information on supported subcommands.
+
+ .. note::
+ Each command item can be a string or list. If the item is a list,
+ 'ubuntu-advantage' can be omitted and it will automatically be
+ inserted as part of the command.
+ """),
+ 'distros': distros,
+ 'examples': [dedent("""\
+ # Enable Extended Security Maintenance using your service auth token
+ ubuntu-advantage:
+ commands:
+ 00: ubuntu-advantage enable-esm <token>
+ """), dedent("""\
+ # Enable livepatch by providing your livepatch token
+ ubuntu-advantage:
+ commands:
+ 00: ubuntu-advantage enable-livepatch <livepatch-token>
+
+ """), dedent("""\
+ # Convenience: the ubuntu-advantage command can be omitted when
+ # specifying commands as a list and 'ubuntu-advantage' will
+ # automatically be prepended.
+ # The following commands are equivalent
+ ubuntu-advantage:
+ commands:
+ 00: ['enable-livepatch', 'my-token']
+ 01: ['ubuntu-advantage', 'enable-livepatch', 'my-token']
+ 02: ubuntu-advantage enable-livepatch my-token
+ 03: 'ubuntu-advantage enable-livepatch my-token'
+ """)],
+ 'frequency': PER_INSTANCE,
+ 'type': 'object',
+ 'properties': {
+ 'ubuntu-advantage': {
+ 'type': 'object',
+ 'properties': {
+ 'commands': {
+ 'type': ['object', 'array'], # Array of strings or dict
+ 'items': {
+ 'oneOf': [
+ {'type': 'array', 'items': {'type': 'string'}},
+ {'type': 'string'}]
+ },
+ 'additionalItems': False, # Reject non-string & non-list
+ 'minItems': 1,
+ 'minProperties': 1,
+ 'uniqueItems': True
+ }
+ },
+ 'additionalProperties': False, # Reject keys not in schema
+ 'required': ['commands']
+ }
+ }
+}
+
+# TODO schema for 'assertions' and 'commands' are too permissive at the moment.
+# Once python-jsonschema supports schema draft 6 add support for arbitrary
+# object keys with 'patternProperties' constraint to validate string values.
+
+__doc__ = get_schema_doc(schema) # Supplement python help()
+
+UA_CMD = "ubuntu-advantage"
+
+
+def run_commands(commands):
+ """Run the commands provided in ubuntu-advantage:commands config.
+
+ Commands are run individually. Any errors are collected and reported
+ after attempting all commands.
+
+ @param commands: A list or dict containing commands to run. Keys of a
+ dict will be used to order the commands provided as dict values.
+ """
+ if not commands:
+ return
+ LOG.debug('Running user-provided ubuntu-advantage commands')
+ if isinstance(commands, dict):
+ # Sort commands based on dictionary key
+ commands = [v for _, v in sorted(commands.items())]
+ elif not isinstance(commands, list):
+ raise TypeError(
+ 'commands parameter was not a list or dict: {commands}'.format(
+ commands=commands))
+
+ fixed_ua_commands = prepend_base_command('ubuntu-advantage', commands)
+
+ cmd_failures = []
+ for command in fixed_ua_commands:
+ shell = isinstance(command, str)
+ try:
+ util.subp(command, shell=shell, status_cb=sys.stderr.write)
+ except util.ProcessExecutionError as e:
+ cmd_failures.append(str(e))
+ if cmd_failures:
+ msg = (
+ 'Failures running ubuntu-advantage commands:\n'
+ '{cmd_failures}'.format(
+ cmd_failures=cmd_failures))
+ util.logexc(LOG, msg)
+ raise RuntimeError(msg)
+
+
+def maybe_install_ua_tools(cloud):
+ """Install ubuntu-advantage-tools if not present."""
+ if util.which('ubuntu-advantage'):
+ return
+ try:
+ cloud.distro.update_package_sources()
+ except Exception as e:
+ util.logexc(LOG, "Package update failed")
+ raise
+ try:
+ cloud.distro.install_packages(['ubuntu-advantage-tools'])
+ except Exception as e:
+ util.logexc(LOG, "Failed to install ubuntu-advantage-tools")
+ raise
+
+
+def handle(name, cfg, cloud, log, args):
+ cfgin = cfg.get('ubuntu-advantage')
+ if cfgin is None:
+ LOG.debug(("Skipping module named %s,"
+ " no 'ubuntu-advantage' key in configuration"), name)
+ return
+
+ validate_cloudconfig_schema(cfg, schema)
+ maybe_install_ua_tools(cloud)
+ run_commands(cfgin.get('commands', []))
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_snap.py b/cloudinit/config/tests/test_snap.py
index cb1205e9..988e7f7c 100644
--- a/cloudinit/config/tests/test_snap.py
+++ b/cloudinit/config/tests/test_snap.py
@@ -4,8 +4,8 @@ import re
from six import StringIO
from cloudinit.config.cc_snap import (
- ASSERTIONS_FILE, add_assertions, handle, prepend_snap_commands,
- maybe_install_squashfuse, run_commands, schema)
+ ASSERTIONS_FILE, add_assertions, handle, maybe_install_squashfuse,
+ run_commands, schema)
from cloudinit.config.schema import validate_cloudconfig_schema
from cloudinit import util
from cloudinit.tests.helpers import CiTestCase, mock, wrap_and_call
@@ -158,54 +158,6 @@ class TestAddAssertions(CiTestCase):
util.load_file(compare_file), util.load_file(assert_file))
-class TestPrepentSnapCommands(CiTestCase):
-
- with_logs = True
-
- def test_prepend_snap_commands_errors_on_neither_string_nor_list(self):
- """Raise an error for each command which is not a string or list."""
- orig_commands = ['ls', 1, {'not': 'gonna work'}, ['snap', 'list']]
- with self.assertRaises(TypeError) as context_manager:
- prepend_snap_commands(orig_commands)
- self.assertEqual(
- "Invalid snap config. These commands are not a string or list:\n"
- "1\n{'not': 'gonna work'}",
- str(context_manager.exception))
-
- def test_prepend_snap_commands_warns_on_non_snap_string_commands(self):
- """Warn on each non-snap for commands of type string."""
- orig_commands = ['ls', 'snap list', 'touch /blah', 'snap install x']
- fixed_commands = prepend_snap_commands(orig_commands)
- self.assertEqual(
- 'WARNING: Non-snap commands in snap config:\n'
- 'ls\ntouch /blah\n',
- self.logs.getvalue())
- self.assertEqual(orig_commands, fixed_commands)
-
- def test_prepend_snap_commands_prepends_on_non_snap_list_commands(self):
- """Prepend 'snap' for each non-snap command of type list."""
- orig_commands = [['ls'], ['snap', 'list'], ['snapa', '/blah'],
- ['snap', 'install', 'x']]
- expected = [['snap', 'ls'], ['snap', 'list'],
- ['snap', 'snapa', '/blah'],
- ['snap', 'install', 'x']]
- fixed_commands = prepend_snap_commands(orig_commands)
- self.assertEqual('', self.logs.getvalue())
- self.assertEqual(expected, fixed_commands)
-
- def test_prepend_snap_commands_removes_first_item_when_none(self):
- """Remove the first element of a non-snap command when it is None."""
- orig_commands = [[None, 'ls'], ['snap', 'list'],
- [None, 'touch', '/blah'],
- ['snap', 'install', 'x']]
- expected = [['ls'], ['snap', 'list'],
- ['touch', '/blah'],
- ['snap', 'install', 'x']]
- fixed_commands = prepend_snap_commands(orig_commands)
- self.assertEqual('', self.logs.getvalue())
- self.assertEqual(expected, fixed_commands)
-
-
class TestRunCommands(CiTestCase):
with_logs = True
diff --git a/cloudinit/config/tests/test_ubuntu_advantage.py b/cloudinit/config/tests/test_ubuntu_advantage.py
new file mode 100644
index 00000000..0eeadd43
--- /dev/null
+++ b/cloudinit/config/tests/test_ubuntu_advantage.py
@@ -0,0 +1,268 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import re
+from six import StringIO
+
+from cloudinit.config.cc_ubuntu_advantage import (
+ handle, maybe_install_ua_tools, run_commands, schema)
+from cloudinit.config.schema import validate_cloudconfig_schema
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase, mock
+
+
+# Module path used in mocks
+MPATH = 'cloudinit.config.cc_ubuntu_advantage'
+
+
+class FakeCloud(object):
+ def __init__(self, distro):
+ self.distro = distro
+
+
+class TestRunCommands(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestRunCommands, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ @mock.patch('%s.util.subp' % MPATH)
+ def test_run_commands_on_empty_list(self, m_subp):
+ """When provided with an empty list, run_commands does nothing."""
+ run_commands([])
+ self.assertEqual('', self.logs.getvalue())
+ m_subp.assert_not_called()
+
+ def test_run_commands_on_non_list_or_dict(self):
+ """When provided an invalid type, run_commands raises an error."""
+ with self.assertRaises(TypeError) as context_manager:
+ run_commands(commands="I'm Not Valid")
+ self.assertEqual(
+ "commands parameter was not a list or dict: I'm Not Valid",
+ str(context_manager.exception))
+
+ def test_run_command_logs_commands_and_exit_codes_to_stderr(self):
+ """All exit codes are logged to stderr."""
+ outfile = self.tmp_path('output.log', dir=self.tmp)
+
+ cmd1 = 'echo "HI" >> %s' % outfile
+ cmd2 = 'bogus command'
+ cmd3 = 'echo "MOM" >> %s' % outfile
+ commands = [cmd1, cmd2, cmd3]
+
+ mock_path = '%s.sys.stderr' % MPATH
+ with mock.patch(mock_path, new_callable=StringIO) as m_stderr:
+ with self.assertRaises(RuntimeError) as context_manager:
+ run_commands(commands=commands)
+
+ self.assertIsNotNone(
+ re.search(r'bogus: (command )?not found',
+ str(context_manager.exception)),
+ msg='Expected bogus command not found')
+ expected_stderr_log = '\n'.join([
+ 'Begin run command: {cmd}'.format(cmd=cmd1),
+ 'End run command: exit(0)',
+ 'Begin run command: {cmd}'.format(cmd=cmd2),
+ 'ERROR: End run command: exit(127)',
+ 'Begin run command: {cmd}'.format(cmd=cmd3),
+ 'End run command: exit(0)\n'])
+ self.assertEqual(expected_stderr_log, m_stderr.getvalue())
+
+ def test_run_command_as_lists(self):
+ """When commands are specified as a list, run them in order."""
+ outfile = self.tmp_path('output.log', dir=self.tmp)
+
+ cmd1 = 'echo "HI" >> %s' % outfile
+ cmd2 = 'echo "MOM" >> %s' % outfile
+ commands = [cmd1, cmd2]
+ with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO):
+ run_commands(commands=commands)
+
+ self.assertIn(
+ 'DEBUG: Running user-provided ubuntu-advantage commands',
+ self.logs.getvalue())
+ self.assertEqual('HI\nMOM\n', util.load_file(outfile))
+ self.assertIn(
+ 'WARNING: Non-ubuntu-advantage commands in ubuntu-advantage'
+ ' config:',
+ self.logs.getvalue())
+
+ def test_run_command_dict_sorted_as_command_script(self):
+ """When commands are a dict, sort them and run."""
+ outfile = self.tmp_path('output.log', dir=self.tmp)
+ cmd1 = 'echo "HI" >> %s' % outfile
+ cmd2 = 'echo "MOM" >> %s' % outfile
+ commands = {'02': cmd1, '01': cmd2}
+ with mock.patch('%s.sys.stderr' % MPATH, new_callable=StringIO):
+ run_commands(commands=commands)
+
+ expected_messages = [
+ 'DEBUG: Running user-provided ubuntu-advantage commands']
+ for message in expected_messages:
+ self.assertIn(message, self.logs.getvalue())
+ self.assertEqual('MOM\nHI\n', util.load_file(outfile))
+
+
+class TestSchema(CiTestCase):
+
+ with_logs = True
+
+ def test_schema_warns_on_ubuntu_advantage_not_as_dict(self):
+ """If ubuntu-advantage configuration is not a dict, emit a warning."""
+ validate_cloudconfig_schema({'ubuntu-advantage': 'wrong type'}, schema)
+ self.assertEqual(
+ "WARNING: Invalid config:\nubuntu-advantage: 'wrong type' is not"
+ " of type 'object'\n",
+ self.logs.getvalue())
+
+ @mock.patch('%s.run_commands' % MPATH)
+ def test_schema_disallows_unknown_keys(self, _):
+ """Unknown keys in ubuntu-advantage configuration emit warnings."""
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': ['ls'], 'invalid-key': ''}},
+ schema)
+ self.assertIn(
+ 'WARNING: Invalid config:\nubuntu-advantage: Additional properties'
+ " are not allowed ('invalid-key' was unexpected)",
+ self.logs.getvalue())
+
+ def test_warn_schema_requires_commands(self):
+ """Warn when ubuntu-advantage configuration lacks commands."""
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {}}, schema)
+ self.assertEqual(
+ "WARNING: Invalid config:\nubuntu-advantage: 'commands' is a"
+ " required property\n",
+ self.logs.getvalue())
+
+ @mock.patch('%s.run_commands' % MPATH)
+ def test_warn_schema_commands_is_not_list_or_dict(self, _):
+ """Warn when ubuntu-advantage:commands config is not a list or dict."""
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': 'broken'}}, schema)
+ self.assertEqual(
+ "WARNING: Invalid config:\nubuntu-advantage.commands: 'broken' is"
+ " not of type 'object', 'array'\n",
+ self.logs.getvalue())
+
+ @mock.patch('%s.run_commands' % MPATH)
+ def test_warn_schema_when_commands_is_empty(self, _):
+ """Emit warnings when ubuntu-advantage:commands is empty."""
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': []}}, schema)
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': {}}}, schema)
+ self.assertEqual(
+ "WARNING: Invalid config:\nubuntu-advantage.commands: [] is too"
+ " short\nWARNING: Invalid config:\nubuntu-advantage.commands: {}"
+ " does not have enough properties\n",
+ self.logs.getvalue())
+
+ @mock.patch('%s.run_commands' % MPATH)
+ def test_schema_when_commands_are_list_or_dict(self, _):
+ """No warnings when ubuntu-advantage:commands are a list or dict."""
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': ['valid']}}, schema)
+ validate_cloudconfig_schema(
+ {'ubuntu-advantage': {'commands': {'01': 'also valid'}}}, schema)
+ self.assertEqual('', self.logs.getvalue())
+
+
+class TestHandle(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestHandle, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ @mock.patch('%s.run_commands' % MPATH)
+ @mock.patch('%s.validate_cloudconfig_schema' % MPATH)
+ def test_handle_no_config(self, m_schema, m_run):
+ """When no ua-related configuration is provided, nothing happens."""
+ cfg = {}
+ handle('ua-test', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertIn(
+ "DEBUG: Skipping module named ua-test, no 'ubuntu-advantage' key"
+ " in config",
+ self.logs.getvalue())
+ m_schema.assert_not_called()
+ m_run.assert_not_called()
+
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ def test_handle_tries_to_install_ubuntu_advantage_tools(self, m_install):
+ """If ubuntu_advantage is provided, try installing ua-tools package."""
+ cfg = {'ubuntu-advantage': {}}
+ mycloud = FakeCloud(None)
+ handle('nomatter', cfg=cfg, cloud=mycloud, log=self.logger, args=None)
+ m_install.assert_called_once_with(mycloud)
+
+ @mock.patch('%s.maybe_install_ua_tools' % MPATH)
+ def test_handle_runs_commands_provided(self, m_install):
+ """When commands are specified as a list, run them."""
+ outfile = self.tmp_path('output.log', dir=self.tmp)
+
+ cfg = {
+ 'ubuntu-advantage': {'commands': ['echo "HI" >> %s' % outfile,
+ 'echo "MOM" >> %s' % outfile]}}
+ mock_path = '%s.sys.stderr' % MPATH
+ with mock.patch(mock_path, new_callable=StringIO):
+ handle('nomatter', cfg=cfg, cloud=None, log=self.logger, args=None)
+ self.assertEqual('HI\nMOM\n', util.load_file(outfile))
+
+
+class TestMaybeInstallUATools(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestMaybeInstallUATools, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ @mock.patch('%s.util.which' % MPATH)
+ def test_maybe_install_ua_tools_noop_when_ua_tools_present(self, m_which):
+ """Do nothing if ubuntu-advantage-tools already exists."""
+ m_which.return_value = '/usr/bin/ubuntu-advantage' # already installed
+ distro = mock.MagicMock()
+ distro.update_package_sources.side_effect = RuntimeError(
+ 'Some apt error')
+ maybe_install_ua_tools(cloud=FakeCloud(distro)) # No RuntimeError
+
+ @mock.patch('%s.util.which' % MPATH)
+ def test_maybe_install_ua_tools_raises_update_errors(self, m_which):
+ """maybe_install_ua_tools logs and raises apt update errors."""
+ m_which.return_value = None
+ distro = mock.MagicMock()
+ distro.update_package_sources.side_effect = RuntimeError(
+ 'Some apt error')
+ with self.assertRaises(RuntimeError) as context_manager:
+ maybe_install_ua_tools(cloud=FakeCloud(distro))
+ self.assertEqual('Some apt error', str(context_manager.exception))
+ self.assertIn('Package update failed\nTraceback', self.logs.getvalue())
+
+ @mock.patch('%s.util.which' % MPATH)
+ def test_maybe_install_ua_raises_install_errors(self, m_which):
+ """maybe_install_ua_tools logs and raises package install errors."""
+ m_which.return_value = None
+ distro = mock.MagicMock()
+ distro.update_package_sources.return_value = None
+ distro.install_packages.side_effect = RuntimeError(
+ 'Some install error')
+ with self.assertRaises(RuntimeError) as context_manager:
+ maybe_install_ua_tools(cloud=FakeCloud(distro))
+ self.assertEqual('Some install error', str(context_manager.exception))
+ self.assertIn(
+ 'Failed to install ubuntu-advantage-tools\n', self.logs.getvalue())
+
+ @mock.patch('%s.util.which' % MPATH)
+ def test_maybe_install_ua_tools_happy_path(self, m_which):
+ """maybe_install_ua_tools installs ubuntu-advantage-tools."""
+ m_which.return_value = None
+ distro = mock.MagicMock() # No errors raised
+ maybe_install_ua_tools(cloud=FakeCloud(distro))
+ distro.update_package_sources.assert_called_once_with()
+ distro.install_packages.assert_called_once_with(
+ ['ubuntu-advantage-tools'])
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/subp.py b/cloudinit/subp.py
new file mode 100644
index 00000000..0ad09306
--- /dev/null
+++ b/cloudinit/subp.py
@@ -0,0 +1,57 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+"""Common utility functions for interacting with subprocess."""
+
+# TODO move subp shellify and runparts related functions out of util.py
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+def prepend_base_command(base_command, commands):
+ """Ensure user-provided commands start with base_command; warn otherwise.
+
+ Each command is either a list or string. Perform the following:
+ - If the command is a list, pop the first element if it is None
+ - If the command is a list, insert base_command as the first element if
+ not present.
+ - When the command is a string not starting with 'base-command', warn.
+
+ Allow flexibility to provide non-base-command environment/config setup if
+ needed.
+
+ @commands: List of commands. Each command element is a list or string.
+
+ @return: List of 'fixed up' commands.
+ @raise: TypeError on invalid config item type.
+ """
+ warnings = []
+ errors = []
+ fixed_commands = []
+ for command in commands:
+ if isinstance(command, list):
+ if command[0] is None: # Avoid warnings by specifying None
+ command = command[1:]
+ elif command[0] != base_command: # Automatically prepend
+ command.insert(0, base_command)
+ elif isinstance(command, str):
+ if not command.startswith('%s ' % base_command):
+ warnings.append(command)
+ else:
+ errors.append(str(command))
+ continue
+ fixed_commands.append(command)
+
+ if warnings:
+ LOG.warning(
+ 'Non-%s commands in %s config:\n%s',
+ base_command, base_command, '\n'.join(warnings))
+ if errors:
+ raise TypeError(
+ 'Invalid {name} config.'
+ ' These commands are not a string or list:\n{errors}'.format(
+ name=base_command, errors='\n'.join(errors)))
+ return fixed_commands
+
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py
new file mode 100644
index 00000000..448097d3
--- /dev/null
+++ b/cloudinit/tests/test_subp.py
@@ -0,0 +1,61 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloudinit.subp utility functions"""
+
+from cloudinit import subp
+from cloudinit.tests.helpers import CiTestCase
+
+
+class TestPrependBaseCommands(CiTestCase):
+
+ with_logs = True
+
+ def test_prepend_base_command_errors_on_neither_string_nor_list(self):
+ """Raise an error for each command which is not a string or list."""
+ orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
+ with self.assertRaises(TypeError) as context_manager:
+ subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual(
+ "Invalid basecmd config. These commands are not a string or"
+ " list:\n1\n{'not': 'gonna work'}",
+ str(context_manager.exception))
+
+ def test_prepend_base_command_warns_on_non_base_string_commands(self):
+ """Warn on each non-base for commands of type string."""
+ orig_commands = [
+ 'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual(
+ 'WARNING: Non-basecmd commands in basecmd config:\n'
+ 'ls\ntouch /blah\n',
+ self.logs.getvalue())
+ self.assertEqual(orig_commands, fixed_commands)
+
+ def test_prepend_base_command_prepends_on_non_base_list_commands(self):
+ """Prepend 'basecmd' for each non-basecmd command of type list."""
+ orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
+ ['basecmd', 'install', 'x']]
+ expected = [['basecmd', 'ls'], ['basecmd', 'list'],
+ ['basecmd', 'basecmda', '/blah'],
+ ['basecmd', 'install', 'x']]
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual('', self.logs.getvalue())
+ self.assertEqual(expected, fixed_commands)
+
+ def test_prepend_base_command_removes_first_item_when_none(self):
+ """Remove the first element of a non-basecmd when it is None."""
+ orig_commands = [[None, 'ls'], ['basecmd', 'list'],
+ [None, 'touch', '/blah'],
+ ['basecmd', 'install', 'x']]
+ expected = [['ls'], ['basecmd', 'list'],
+ ['touch', '/blah'],
+ ['basecmd', 'install', 'x']]
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual('', self.logs.getvalue())
+ self.assertEqual(expected, fixed_commands)
+
+# vi: ts=4 expandtab