diff options
Diffstat (limited to 'cloudinit/tests')
-rw-r--r-- | cloudinit/tests/__init__.py | 0 | ||||
-rw-r--r-- | cloudinit/tests/helpers.py | 504 | ||||
-rw-r--r-- | cloudinit/tests/test_conftest.py | 65 | ||||
-rw-r--r-- | cloudinit/tests/test_dhclient_hook.py | 105 | ||||
-rw-r--r-- | cloudinit/tests/test_dmi.py | 154 | ||||
-rw-r--r-- | cloudinit/tests/test_features.py | 60 | ||||
-rw-r--r-- | cloudinit/tests/test_gpg.py | 55 | ||||
-rw-r--r-- | cloudinit/tests/test_netinfo.py | 181 | ||||
-rw-r--r-- | cloudinit/tests/test_persistence.py | 127 | ||||
-rw-r--r-- | cloudinit/tests/test_simpletable.py | 106 | ||||
-rw-r--r-- | cloudinit/tests/test_stages.py | 406 | ||||
-rw-r--r-- | cloudinit/tests/test_subp.py | 286 | ||||
-rw-r--r-- | cloudinit/tests/test_temp_utils.py | 117 | ||||
-rw-r--r-- | cloudinit/tests/test_upgrade.py | 45 | ||||
-rw-r--r-- | cloudinit/tests/test_url_helper.py | 174 | ||||
-rw-r--r-- | cloudinit/tests/test_util.py | 854 | ||||
-rw-r--r-- | cloudinit/tests/test_version.py | 31 |
17 files changed, 0 insertions, 3270 deletions
diff --git a/cloudinit/tests/__init__.py b/cloudinit/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/cloudinit/tests/__init__.py +++ /dev/null diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py deleted file mode 100644 index 58f63b69..00000000 --- a/cloudinit/tests/helpers.py +++ /dev/null @@ -1,504 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import functools -import httpretty -import io -import logging -import os -import random -import shutil -import string -import sys -import tempfile -import time -import unittest -from contextlib import ExitStack, contextmanager -from unittest import mock -from unittest.util import strclass - -from cloudinit.config.schema import ( - SchemaValidationError, validate_cloudconfig_schema) -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers as ch -from cloudinit.sources import DataSourceNone -from cloudinit.templater import JINJA_AVAILABLE -from cloudinit import subp -from cloudinit import util - -_real_subp = subp.subp - -# Used for skipping tests -SkipTest = unittest.SkipTest -skipIf = unittest.skipIf - - -# Makes the old path start -# with new base instead of whatever -# it previously had -def rebase_path(old_path, new_base): - if old_path.startswith(new_base): - # Already handled... - return old_path - # Retarget the base of that path - # to the new base instead of the - # old one... - path = os.path.join(new_base, old_path.lstrip("/")) - path = os.path.abspath(path) - return path - - -# Can work on anything that takes a path as arguments -def retarget_many_wrapper(new_base, am, old_func): - def wrapper(*args, **kwds): - n_args = list(args) - nam = am - if am == -1: - nam = len(n_args) - for i in range(0, nam): - path = args[i] - # patchOS() wraps various os and os.path functions, however in - # Python 3 some of these now accept file-descriptors (integers). - # That breaks rebase_path() so in lieu of a better solution, just - # don't rebase if we get a fd. - if isinstance(path, str): - n_args[i] = rebase_path(path, new_base) - return old_func(*n_args, **kwds) - return wrapper - - -class TestCase(unittest.TestCase): - - def reset_global_state(self): - """Reset any global state to its original settings. - - cloudinit caches some values in cloudinit.util. Unit tests that - involved those cached paths were then subject to failure if the order - of invocation changed (LP: #1703697). - - This function resets any of these global state variables to their - initial state. - - In the future this should really be done with some registry that - can then be cleaned in a more obvious way. - """ - util.PROC_CMDLINE = None - util._DNS_REDIRECT_IP = None - util._LSB_RELEASE = {} - - def setUp(self): - super(TestCase, self).setUp() - self.reset_global_state() - - def shortDescription(self): - return strclass(self.__class__) + '.' + self._testMethodName - - def add_patch(self, target, attr, *args, **kwargs): - """Patches specified target object and sets it as attr on test - instance also schedules cleanup""" - if 'autospec' not in kwargs: - kwargs['autospec'] = True - m = mock.patch(target, *args, **kwargs) - p = m.start() - self.addCleanup(m.stop) - setattr(self, attr, p) - - -class CiTestCase(TestCase): - """This is the preferred test case base class unless user - needs other test case classes below.""" - - # Subclass overrides for specific test behavior - # Whether or not a unit test needs logfile setup - with_logs = False - allowed_subp = False - SUBP_SHELL_TRUE = "shell=true" - - @contextmanager - def allow_subp(self, allowed_subp): - orig = self.allowed_subp - try: - self.allowed_subp = allowed_subp - yield - finally: - self.allowed_subp = orig - - def setUp(self): - super(CiTestCase, self).setUp() - if self.with_logs: - # Create a log handler so unit tests can search expected logs. - self.logger = logging.getLogger() - self.logs = io.StringIO() - formatter = logging.Formatter('%(levelname)s: %(message)s') - handler = logging.StreamHandler(self.logs) - handler.setFormatter(formatter) - self.old_handlers = self.logger.handlers - self.logger.handlers = [handler] - if self.allowed_subp is True: - subp.subp = _real_subp - else: - subp.subp = self._fake_subp - - def _fake_subp(self, *args, **kwargs): - if 'args' in kwargs: - cmd = kwargs['args'] - else: - if not args: - raise TypeError( - "subp() missing 1 required positional argument: 'args'") - cmd = args[0] - - if not isinstance(cmd, str): - cmd = cmd[0] - pass_through = False - if not isinstance(self.allowed_subp, (list, bool)): - raise TypeError("self.allowed_subp supports list or bool.") - if isinstance(self.allowed_subp, bool): - pass_through = self.allowed_subp - else: - pass_through = ( - (cmd in self.allowed_subp) or - (self.SUBP_SHELL_TRUE in self.allowed_subp and - kwargs.get('shell'))) - if pass_through: - return _real_subp(*args, **kwargs) - raise Exception( - "called subp. set self.allowed_subp=True to allow\n subp(%s)" % - ', '.join([str(repr(a)) for a in args] + - ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()])) - - def tearDown(self): - if self.with_logs: - # Remove the handler we setup - logging.getLogger().handlers = self.old_handlers - logging.getLogger().level = None - subp.subp = _real_subp - super(CiTestCase, self).tearDown() - - def tmp_dir(self, dir=None, cleanup=True): - # return a full path to a temporary directory that will be cleaned up. - if dir is None: - tmpd = tempfile.mkdtemp( - prefix="ci-%s." % self.__class__.__name__) - else: - tmpd = tempfile.mkdtemp(dir=dir) - self.addCleanup( - functools.partial(shutil.rmtree, tmpd, ignore_errors=True)) - return tmpd - - def tmp_path(self, path, dir=None): - # return an absolute path to 'path' under dir. - # if dir is None, one will be created with tmp_dir() - # the file is not created or modified. - if dir is None: - dir = self.tmp_dir() - return os.path.normpath(os.path.abspath(os.path.join(dir, path))) - - def tmp_cloud(self, distro, sys_cfg=None, metadata=None): - """Create a cloud with tmp working directory paths. - - @param distro: Name of the distro to attach to the cloud. - @param metadata: Optional metadata to set on the datasource. - - @return: The built cloud instance. - """ - self.new_root = self.tmp_dir() - if not sys_cfg: - sys_cfg = {} - tmp_paths = {} - for var in ['templates_dir', 'run_dir', 'cloud_dir']: - tmp_paths[var] = self.tmp_path(var, dir=self.new_root) - util.ensure_dir(tmp_paths[var]) - self.paths = ch.Paths(tmp_paths) - cls = distros.fetch(distro) - mydist = cls(distro, sys_cfg, self.paths) - myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, self.paths) - if metadata: - myds.metadata.update(metadata) - return cloud.Cloud(myds, self.paths, sys_cfg, mydist, None) - - @classmethod - def random_string(cls, length=8): - """ return a random lowercase string with default length of 8""" - return ''.join( - random.choice(string.ascii_lowercase) for _ in range(length)) - - -class ResourceUsingTestCase(CiTestCase): - - def setUp(self): - super(ResourceUsingTestCase, self).setUp() - self.resource_path = None - - def getCloudPaths(self, ds=None): - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - cp = ch.Paths({'cloud_dir': tmpdir, - 'templates_dir': resourceLocation()}, - ds=ds) - return cp - - -class FilesystemMockingTestCase(ResourceUsingTestCase): - - def setUp(self): - super(FilesystemMockingTestCase, self).setUp() - self.patched_funcs = ExitStack() - - def tearDown(self): - self.patched_funcs.close() - ResourceUsingTestCase.tearDown(self) - - def replicateTestRoot(self, example_root, target_root): - real_root = resourceLocation() - real_root = os.path.join(real_root, 'roots', example_root) - for (dir_path, _dirnames, filenames) in os.walk(real_root): - real_path = dir_path - make_path = rebase_path(real_path[len(real_root):], target_root) - util.ensure_dir(make_path) - for f in filenames: - real_path = util.abs_join(real_path, f) - make_path = util.abs_join(make_path, f) - shutil.copy(real_path, make_path) - - def patchUtils(self, new_root): - patch_funcs = { - util: [('write_file', 1), - ('append_file', 1), - ('load_file', 1), - ('ensure_dir', 1), - ('chmod', 1), - ('delete_dir_contents', 1), - ('del_file', 1), - ('sym_link', -1), - ('copy', -1)], - } - for (mod, funcs) in patch_funcs.items(): - for (f, am) in funcs: - func = getattr(mod, f) - trap_func = retarget_many_wrapper(new_root, am, func) - self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) - - # Handle subprocess calls - func = getattr(subp, 'subp') - - def nsubp(*_args, **_kwargs): - return ('', '') - - self.patched_funcs.enter_context( - mock.patch.object(subp, 'subp', nsubp)) - - def null_func(*_args, **_kwargs): - return None - - for f in ['chownbyid', 'chownbyname']: - self.patched_funcs.enter_context( - mock.patch.object(util, f, null_func)) - - def patchOS(self, new_root): - patch_funcs = { - os.path: [('isfile', 1), ('exists', 1), - ('islink', 1), ('isdir', 1), ('lexists', 1)], - os: [('listdir', 1), ('mkdir', 1), - ('lstat', 1), ('symlink', 2), - ('stat', 1)] - } - - if hasattr(os, 'scandir'): - # py27 does not have scandir - patch_funcs[os].append(('scandir', 1)) - - for (mod, funcs) in patch_funcs.items(): - for f, nargs in funcs: - func = getattr(mod, f) - trap_func = retarget_many_wrapper(new_root, nargs, func) - self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) - - def patchOpen(self, new_root): - trap_func = retarget_many_wrapper(new_root, 1, open) - self.patched_funcs.enter_context( - mock.patch('builtins.open', trap_func) - ) - - def patchStdoutAndStderr(self, stdout=None, stderr=None): - if stdout is not None: - self.patched_funcs.enter_context( - mock.patch.object(sys, 'stdout', stdout)) - if stderr is not None: - self.patched_funcs.enter_context( - mock.patch.object(sys, 'stderr', stderr)) - - def reRoot(self, root=None): - if root is None: - root = self.tmp_dir() - self.patchUtils(root) - self.patchOS(root) - self.patchOpen(root) - return root - - @contextmanager - def reRooted(self, root=None): - try: - yield self.reRoot(root) - finally: - self.patched_funcs.close() - - -class HttprettyTestCase(CiTestCase): - # necessary as http_proxy gets in the way of httpretty - # https://github.com/gabrielfalcao/HTTPretty/issues/122 - # Also make sure that allow_net_connect is set to False. - # And make sure reset and enable/disable are done. - - def setUp(self): - self.restore_proxy = os.environ.get('http_proxy') - if self.restore_proxy is not None: - del os.environ['http_proxy'] - super(HttprettyTestCase, self).setUp() - httpretty.HTTPretty.allow_net_connect = False - httpretty.reset() - httpretty.enable() - - def tearDown(self): - httpretty.disable() - httpretty.reset() - if self.restore_proxy: - os.environ['http_proxy'] = self.restore_proxy - super(HttprettyTestCase, self).tearDown() - - -class SchemaTestCaseMixin(unittest.TestCase): - - def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."): - """Assert the config is valid per self.schema. - - If there is only one top level key in the schema properties, then - the cfg will be put under that key.""" - props = list(self.schema.get('properties')) - # put cfg under top level key if there is only one in the schema - if len(props) == 1: - cfg = {props[0]: cfg} - try: - validate_cloudconfig_schema(cfg, self.schema, strict=True) - except SchemaValidationError: - self.fail(msg) - - -def populate_dir(path, files): - if not os.path.exists(path): - os.makedirs(path) - ret = [] - for (name, content) in files.items(): - p = os.path.sep.join([path, name]) - util.ensure_dir(os.path.dirname(p)) - with open(p, "wb") as fp: - if isinstance(content, bytes): - fp.write(content) - else: - fp.write(content.encode('utf-8')) - fp.close() - ret.append(p) - - return ret - - -def populate_dir_with_ts(path, data): - """data is {'file': ('contents', mtime)}. mtime relative to now.""" - populate_dir(path, dict((k, v[0]) for k, v in data.items())) - btime = time.time() - for fpath, (_contents, mtime) in data.items(): - ts = btime + mtime if mtime else btime - os.utime(os.path.sep.join((path, fpath)), (ts, ts)) - - -def dir2dict(startdir, prefix=None): - flist = {} - if prefix is None: - prefix = startdir - for root, _dirs, files in os.walk(startdir): - for fname in files: - fpath = os.path.join(root, fname) - key = fpath[len(prefix):] - flist[key] = util.load_file(fpath) - return flist - - -def wrap_and_call(prefix, mocks, func, *args, **kwargs): - """ - call func(args, **kwargs) with mocks applied, then unapplies mocks - nicer to read than repeating dectorators on each function - - prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None - mocks: dictionary of names (under 'prefix') to mock and either - a return value or a dictionary to pass to the mock.patch call - func: function to call with mocks applied - *args,**kwargs: arguments for 'func' - - return_value: return from 'func' - """ - delim = '.' - if prefix is None: - prefix = '' - prefix = prefix.rstrip(delim) - unwraps = [] - for fname, kw in mocks.items(): - if prefix: - fname = delim.join((prefix, fname)) - if not isinstance(kw, dict): - kw = {'return_value': kw} - p = mock.patch(fname, **kw) - p.start() - unwraps.append(p) - try: - return func(*args, **kwargs) - finally: - for p in unwraps: - p.stop() - - -def resourceLocation(subname=None): - path = os.path.join('tests', 'data') - if not subname: - return path - return os.path.join(path, subname) - - -def readResource(name, mode='r'): - with open(resourceLocation(name), mode) as fh: - return fh.read() - - -try: - import jsonschema - assert jsonschema # avoid pyflakes error F401: import unused - _missing_jsonschema_dep = False -except ImportError: - _missing_jsonschema_dep = True - - -def skipUnlessJsonSchema(): - return skipIf( - _missing_jsonschema_dep, "No python-jsonschema dependency present.") - - -def skipUnlessJinja(): - return skipIf(not JINJA_AVAILABLE, "No jinja dependency present.") - - -def skipIfJinja(): - return skipIf(JINJA_AVAILABLE, "Jinja dependency present.") - - -# older versions of mock do not have the useful 'assert_not_called' -if not hasattr(mock.Mock, 'assert_not_called'): - def __mock_assert_not_called(mmock): - if mmock.call_count != 0: - msg = ("[citest] Expected '%s' to not have been called. " - "Called %s times." % - (mmock._mock_name or 'mock', mmock.call_count)) - raise AssertionError(msg) - mock.Mock.assert_not_called = __mock_assert_not_called - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_conftest.py b/cloudinit/tests/test_conftest.py deleted file mode 100644 index 6f1263a5..00000000 --- a/cloudinit/tests/test_conftest.py +++ /dev/null @@ -1,65 +0,0 @@ -import pytest - -from cloudinit import subp -from cloudinit.tests.helpers import CiTestCase - - -class TestDisableSubpUsage: - """Test that the disable_subp_usage fixture behaves as expected.""" - - def test_using_subp_raises_assertion_error(self): - with pytest.raises(AssertionError): - subp.subp(["some", "args"]) - - def test_typeerrors_on_incorrect_usage(self): - with pytest.raises(TypeError): - # We are intentionally passing no value for a parameter, so: - # pylint: disable=no-value-for-parameter - subp.subp() - - @pytest.mark.allow_all_subp - def test_subp_usage_can_be_reenabled(self): - subp.subp(['whoami']) - - @pytest.mark.allow_subp_for("whoami") - def test_subp_usage_can_be_conditionally_reenabled(self): - # The two parameters test each potential invocation with a single - # argument - with pytest.raises(AssertionError) as excinfo: - subp.subp(["some", "args"]) - assert "allowed: whoami" in str(excinfo.value) - subp.subp(['whoami']) - - @pytest.mark.allow_subp_for("whoami", "bash") - def test_subp_usage_can_be_conditionally_reenabled_for_multiple_cmds(self): - with pytest.raises(AssertionError) as excinfo: - subp.subp(["some", "args"]) - assert "allowed: whoami,bash" in str(excinfo.value) - subp.subp(['bash', '-c', 'true']) - subp.subp(['whoami']) - - @pytest.mark.allow_all_subp - @pytest.mark.allow_subp_for("bash") - def test_both_marks_raise_an_error(self): - with pytest.raises(AssertionError, match="marked both"): - subp.subp(["bash"]) - - -class TestDisableSubpUsageInTestSubclass(CiTestCase): - """Test that disable_subp_usage doesn't impact CiTestCase's subp logic.""" - - def test_using_subp_raises_exception(self): - with pytest.raises(Exception): - subp.subp(["some", "args"]) - - def test_typeerrors_on_incorrect_usage(self): - with pytest.raises(TypeError): - subp.subp() - - def test_subp_usage_can_be_reenabled(self): - _old_allowed_subp = self.allow_subp - self.allowed_subp = True - try: - subp.subp(['bash', '-c', 'true']) - finally: - self.allowed_subp = _old_allowed_subp diff --git a/cloudinit/tests/test_dhclient_hook.py b/cloudinit/tests/test_dhclient_hook.py deleted file mode 100644 index eadae81c..00000000 --- a/cloudinit/tests/test_dhclient_hook.py +++ /dev/null @@ -1,105 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests for cloudinit.dhclient_hook.""" - -from cloudinit import dhclient_hook as dhc -from cloudinit.tests.helpers import CiTestCase, dir2dict, populate_dir - -import argparse -import json -import os -from unittest import mock - - -class TestDhclientHook(CiTestCase): - - ex_env = { - 'interface': 'eth0', - 'new_dhcp_lease_time': '3600', - 'new_host_name': 'x1', - 'new_ip_address': '10.145.210.163', - 'new_subnet_mask': '255.255.255.0', - 'old_host_name': 'x1', - 'PATH': '/usr/sbin:/usr/bin:/sbin:/bin', - 'pid': '614', - 'reason': 'BOUND', - } - - # some older versions of dhclient put the same content, - # but in upper case with DHCP4_ instead of new_ - ex_env_dhcp4 = { - 'REASON': 'BOUND', - 'DHCP4_dhcp_lease_time': '3600', - 'DHCP4_host_name': 'x1', - 'DHCP4_ip_address': '10.145.210.163', - 'DHCP4_subnet_mask': '255.255.255.0', - 'INTERFACE': 'eth0', - 'PATH': '/usr/sbin:/usr/bin:/sbin:/bin', - 'pid': '614', - } - - expected = { - 'dhcp_lease_time': '3600', - 'host_name': 'x1', - 'ip_address': '10.145.210.163', - 'subnet_mask': '255.255.255.0'} - - def setUp(self): - super(TestDhclientHook, self).setUp() - self.tmp = self.tmp_dir() - - def test_handle_args(self): - """quick test of call to handle_args.""" - nic = 'eth0' - args = argparse.Namespace(event=dhc.UP, interface=nic) - with mock.patch.dict("os.environ", clear=True, values=self.ex_env): - dhc.handle_args(dhc.NAME, args, data_d=self.tmp) - found = dir2dict(self.tmp + os.path.sep) - self.assertEqual([nic + ".json"], list(found.keys())) - self.assertEqual(self.expected, json.loads(found[nic + ".json"])) - - def test_run_hook_up_creates_dir(self): - """If dir does not exist, run_hook should create it.""" - subd = self.tmp_path("subdir", self.tmp) - nic = 'eth1' - dhc.run_hook(nic, 'up', data_d=subd, env=self.ex_env) - self.assertEqual( - set([nic + ".json"]), set(dir2dict(subd + os.path.sep))) - - def test_run_hook_up(self): - """Test expected use of run_hook_up.""" - nic = 'eth0' - dhc.run_hook(nic, 'up', data_d=self.tmp, env=self.ex_env) - found = dir2dict(self.tmp + os.path.sep) - self.assertEqual([nic + ".json"], list(found.keys())) - self.assertEqual(self.expected, json.loads(found[nic + ".json"])) - - def test_run_hook_up_dhcp4_prefix(self): - """Test run_hook filters correctly with older DHCP4_ data.""" - nic = 'eth0' - dhc.run_hook(nic, 'up', data_d=self.tmp, env=self.ex_env_dhcp4) - found = dir2dict(self.tmp + os.path.sep) - self.assertEqual([nic + ".json"], list(found.keys())) - self.assertEqual(self.expected, json.loads(found[nic + ".json"])) - - def test_run_hook_down_deletes(self): - """down should delete the created json file.""" - nic = 'eth1' - populate_dir( - self.tmp, {nic + ".json": "{'abcd'}", 'myfile.txt': 'text'}) - dhc.run_hook(nic, 'down', data_d=self.tmp, env={'old_host_name': 'x1'}) - self.assertEqual( - set(['myfile.txt']), - set(dir2dict(self.tmp + os.path.sep))) - - def test_get_parser(self): - """Smoke test creation of get_parser.""" - # cloud-init main uses 'action'. - event, interface = (dhc.UP, 'mynic0') - self.assertEqual( - argparse.Namespace(event=event, interface=interface, - action=(dhc.NAME, dhc.handle_args)), - dhc.get_parser().parse_args([event, interface])) - - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_dmi.py b/cloudinit/tests/test_dmi.py deleted file mode 100644 index 78a72122..00000000 --- a/cloudinit/tests/test_dmi.py +++ /dev/null @@ -1,154 +0,0 @@ -from cloudinit.tests import helpers -from cloudinit import dmi -from cloudinit import util -from cloudinit import subp - -import os -import tempfile -import shutil -from unittest import mock - - -class TestReadDMIData(helpers.FilesystemMockingTestCase): - - def setUp(self): - super(TestReadDMIData, self).setUp() - self.new_root = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.new_root) - self.reRoot(self.new_root) - p = mock.patch("cloudinit.dmi.is_container", return_value=False) - self.addCleanup(p.stop) - self._m_is_container = p.start() - p = mock.patch("cloudinit.dmi.is_FreeBSD", return_value=False) - self.addCleanup(p.stop) - self._m_is_FreeBSD = p.start() - - def _create_sysfs_parent_directory(self): - util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id')) - - def _create_sysfs_file(self, key, content): - """Mocks the sys path found on Linux systems.""" - self._create_sysfs_parent_directory() - dmi_key = "/sys/class/dmi/id/{0}".format(key) - util.write_file(dmi_key, content) - - def _configure_dmidecode_return(self, key, content, error=None): - """ - In order to test a missing sys path and call outs to dmidecode, this - function fakes the results of dmidecode to test the results. - """ - def _dmidecode_subp(cmd): - if cmd[-1] != key: - raise subp.ProcessExecutionError() - return (content, error) - - self.patched_funcs.enter_context( - mock.patch("cloudinit.dmi.subp.which", side_effect=lambda _: True)) - self.patched_funcs.enter_context( - mock.patch("cloudinit.dmi.subp.subp", side_effect=_dmidecode_subp)) - - def _configure_kenv_return(self, key, content, error=None): - """ - In order to test a FreeBSD system call outs to kenv, this - function fakes the results of kenv to test the results. - """ - def _kenv_subp(cmd): - if cmd[-1] != dmi.DMIDECODE_TO_KERNEL[key].freebsd: - raise subp.ProcessExecutionError() - return (content, error) - - self.patched_funcs.enter_context( - mock.patch("cloudinit.dmi.subp.subp", side_effect=_kenv_subp)) - - def patch_mapping(self, new_mapping): - self.patched_funcs.enter_context( - mock.patch('cloudinit.dmi.DMIDECODE_TO_KERNEL', - new_mapping)) - - def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self): - self.patch_mapping({'mapped-key': dmi.kdmi('mapped-value', None)}) - expected_dmi_value = 'sys-used-correctly' - self._create_sysfs_file('mapped-value', expected_dmi_value) - self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong') - self.assertEqual(expected_dmi_value, dmi.read_dmi_data('mapped-key')) - - def test_dmidecode_used_if_no_sysfs_file_on_disk(self): - self.patch_mapping({}) - self._create_sysfs_parent_directory() - expected_dmi_value = 'dmidecode-used' - self._configure_dmidecode_return('use-dmidecode', expected_dmi_value) - with mock.patch("cloudinit.util.os.uname") as m_uname: - m_uname.return_value = ('x-sysname', 'x-nodename', - 'x-release', 'x-version', 'x86_64') - self.assertEqual(expected_dmi_value, - dmi.read_dmi_data('use-dmidecode')) - - def test_dmidecode_not_used_on_arm(self): - self.patch_mapping({}) - print("current =%s", subp) - self._create_sysfs_parent_directory() - dmi_val = 'from-dmidecode' - dmi_name = 'use-dmidecode' - self._configure_dmidecode_return(dmi_name, dmi_val) - print("now =%s", subp) - - expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val} - found = {} - # we do not run the 'dmi-decode' binary on some arches - # verify that anything requested that is not in the sysfs dir - # will return None on those arches. - with mock.patch("cloudinit.util.os.uname") as m_uname: - for arch in expected: - m_uname.return_value = ('x-sysname', 'x-nodename', - 'x-release', 'x-version', arch) - print("now2 =%s", subp) - found[arch] = dmi.read_dmi_data(dmi_name) - self.assertEqual(expected, found) - - def test_none_returned_if_neither_source_has_data(self): - self.patch_mapping({}) - self._configure_dmidecode_return('key', 'value') - self.assertIsNone(dmi.read_dmi_data('expect-fail')) - - def test_none_returned_if_dmidecode_not_in_path(self): - self.patched_funcs.enter_context( - mock.patch.object(subp, 'which', lambda _: False)) - self.patch_mapping({}) - self.assertIsNone(dmi.read_dmi_data('expect-fail')) - - def test_empty_string_returned_instead_of_foxfox(self): - # uninitialized dmi values show as \xff, return empty string - my_len = 32 - dmi_value = b'\xff' * my_len + b'\n' - expected = "" - dmi_key = 'system-product-name' - sysfs_key = 'product_name' - self._create_sysfs_file(sysfs_key, dmi_value) - self.assertEqual(expected, dmi.read_dmi_data(dmi_key)) - - def test_container_returns_none(self): - """In a container read_dmi_data should always return None.""" - - # first verify we get the value if not in container - self._m_is_container.return_value = False - key, val = ("system-product-name", "my_product") - self._create_sysfs_file('product_name', val) - self.assertEqual(val, dmi.read_dmi_data(key)) - - # then verify in container returns None - self._m_is_container.return_value = True - self.assertIsNone(dmi.read_dmi_data(key)) - - def test_container_returns_none_on_unknown(self): - """In a container even bogus keys return None.""" - self._m_is_container.return_value = True - self._create_sysfs_file('product_name', "should-be-ignored") - self.assertIsNone(dmi.read_dmi_data("bogus")) - self.assertIsNone(dmi.read_dmi_data("system-product-name")) - - def test_freebsd_uses_kenv(self): - """On a FreeBSD system, kenv is called.""" - self._m_is_FreeBSD.return_value = True - key, val = ("system-product-name", "my_product") - self._configure_kenv_return(key, val) - self.assertEqual(dmi.read_dmi_data(key), val) diff --git a/cloudinit/tests/test_features.py b/cloudinit/tests/test_features.py deleted file mode 100644 index d7a7226d..00000000 --- a/cloudinit/tests/test_features.py +++ /dev/null @@ -1,60 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. -# pylint: disable=no-member,no-name-in-module -""" -This file is for testing the feature flag functionality itself, -NOT for testing any individual feature flag -""" -import pytest -import sys -from pathlib import Path - -import cloudinit - - -@pytest.yield_fixture() -def create_override(request): - """ - Create a feature overrides file and do some module wizardry to make - it seem like we're importing the features file for the first time. - - After creating the override file with the values passed by the test, - we need to reload cloudinit.features - to get all of the current features (including the overridden ones). - Once the test is complete, we remove the file we created and set - features and feature_overrides modules to how they were before - the test started - """ - override_path = Path(cloudinit.__file__).parent / 'feature_overrides.py' - if override_path.exists(): - raise Exception("feature_overrides.py unexpectedly exists! " - "Remove it to run this test.") - with override_path.open('w') as f: - for key, value in request.param.items(): - f.write('{} = {}\n'.format(key, value)) - - sys.modules.pop('cloudinit.features', None) - - yield - - override_path.unlink() - sys.modules.pop('cloudinit.feature_overrides', None) - - -class TestFeatures: - def test_feature_without_override(self): - from cloudinit.features import ERROR_ON_USER_DATA_FAILURE - assert ERROR_ON_USER_DATA_FAILURE is True - - @pytest.mark.parametrize('create_override', - [{'ERROR_ON_USER_DATA_FAILURE': False}], - indirect=True) - def test_feature_with_override(self, create_override): - from cloudinit.features import ERROR_ON_USER_DATA_FAILURE - assert ERROR_ON_USER_DATA_FAILURE is False - - @pytest.mark.parametrize('create_override', - [{'SPAM': True}], - indirect=True) - def test_feature_only_in_override(self, create_override): - from cloudinit.features import SPAM - assert SPAM is True diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py deleted file mode 100644 index 311dfad6..00000000 --- a/cloudinit/tests/test_gpg.py +++ /dev/null @@ -1,55 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. -"""Test gpg module.""" - -from unittest import mock - -from cloudinit import gpg -from cloudinit import subp -from cloudinit.tests.helpers import CiTestCase - - -@mock.patch("cloudinit.gpg.time.sleep") -@mock.patch("cloudinit.gpg.subp.subp") -class TestReceiveKeys(CiTestCase): - """Test the recv_key method.""" - - def test_retries_on_subp_exc(self, m_subp, m_sleep): - """retry should be done on gpg receive keys failure.""" - retries = (1, 2, 4) - my_exc = subp.ProcessExecutionError( - stdout='', stderr='', exit_code=2, cmd=['mycmd']) - m_subp.side_effect = (my_exc, my_exc, ('', '')) - gpg.recv_key("ABCD", "keyserver.example.com", retries=retries) - self.assertEqual([mock.call(1), mock.call(2)], m_sleep.call_args_list) - - def test_raises_error_after_retries(self, m_subp, m_sleep): - """If the final run fails, error should be raised.""" - naplen = 1 - keyid, keyserver = ("ABCD", "keyserver.example.com") - m_subp.side_effect = subp.ProcessExecutionError( - stdout='', stderr='', exit_code=2, cmd=['mycmd']) - with self.assertRaises(ValueError) as rcm: - gpg.recv_key(keyid, keyserver, retries=(naplen,)) - self.assertIn(keyid, str(rcm.exception)) - self.assertIn(keyserver, str(rcm.exception)) - m_sleep.assert_called_with(naplen) - - def test_no_retries_on_none(self, m_subp, m_sleep): - """retry should not be done if retries is None.""" - m_subp.side_effect = subp.ProcessExecutionError( - stdout='', stderr='', exit_code=2, cmd=['mycmd']) - with self.assertRaises(ValueError): - gpg.recv_key("ABCD", "keyserver.example.com", retries=None) - m_sleep.assert_not_called() - - def test_expected_gpg_command(self, m_subp, m_sleep): - """Verify gpg is called with expected args.""" - key, keyserver = ("DEADBEEF", "keyserver.example.com") - retries = (1, 2, 4) - m_subp.return_value = ('', '') - gpg.recv_key(key, keyserver, retries=retries) - m_subp.assert_called_once_with( - ['gpg', '--no-tty', - '--keyserver=%s' % keyserver, '--recv-keys', key], - capture=True) - m_sleep.assert_not_called() diff --git a/cloudinit/tests/test_netinfo.py b/cloudinit/tests/test_netinfo.py deleted file mode 100644 index e44b16d8..00000000 --- a/cloudinit/tests/test_netinfo.py +++ /dev/null @@ -1,181 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests netinfo module functions and classes.""" - -from copy import copy - -from cloudinit.netinfo import netdev_info, netdev_pformat, route_pformat -from cloudinit.tests.helpers import CiTestCase, mock, readResource - - -# Example ifconfig and route output -SAMPLE_OLD_IFCONFIG_OUT = readResource("netinfo/old-ifconfig-output") -SAMPLE_NEW_IFCONFIG_OUT = readResource("netinfo/new-ifconfig-output") -SAMPLE_FREEBSD_IFCONFIG_OUT = readResource("netinfo/freebsd-ifconfig-output") -SAMPLE_IPADDRSHOW_OUT = readResource("netinfo/sample-ipaddrshow-output") -SAMPLE_ROUTE_OUT_V4 = readResource("netinfo/sample-route-output-v4") -SAMPLE_ROUTE_OUT_V6 = readResource("netinfo/sample-route-output-v6") -SAMPLE_IPROUTE_OUT_V4 = readResource("netinfo/sample-iproute-output-v4") -SAMPLE_IPROUTE_OUT_V6 = readResource("netinfo/sample-iproute-output-v6") -NETDEV_FORMATTED_OUT = readResource("netinfo/netdev-formatted-output") -ROUTE_FORMATTED_OUT = readResource("netinfo/route-formatted-output") -FREEBSD_NETDEV_OUT = readResource("netinfo/freebsd-netdev-formatted-output") - - -class TestNetInfo(CiTestCase): - - maxDiff = None - with_logs = True - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_old_nettools_pformat(self, m_subp, m_which): - """netdev_pformat properly rendering old nettools info.""" - m_subp.return_value = (SAMPLE_OLD_IFCONFIG_OUT, '') - m_which.side_effect = lambda x: x if x == 'ifconfig' else None - content = netdev_pformat() - self.assertEqual(NETDEV_FORMATTED_OUT, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_new_nettools_pformat(self, m_subp, m_which): - """netdev_pformat properly rendering netdev new nettools info.""" - m_subp.return_value = (SAMPLE_NEW_IFCONFIG_OUT, '') - m_which.side_effect = lambda x: x if x == 'ifconfig' else None - content = netdev_pformat() - self.assertEqual(NETDEV_FORMATTED_OUT, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_freebsd_nettools_pformat(self, m_subp, m_which): - """netdev_pformat properly rendering netdev new nettools info.""" - m_subp.return_value = (SAMPLE_FREEBSD_IFCONFIG_OUT, '') - m_which.side_effect = lambda x: x if x == 'ifconfig' else None - content = netdev_pformat() - print() - print(content) - print() - self.assertEqual(FREEBSD_NETDEV_OUT, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_iproute_pformat(self, m_subp, m_which): - """netdev_pformat properly rendering ip route info.""" - m_subp.return_value = (SAMPLE_IPADDRSHOW_OUT, '') - m_which.side_effect = lambda x: x if x == 'ip' else None - content = netdev_pformat() - new_output = copy(NETDEV_FORMATTED_OUT) - # ip route show describes global scopes on ipv4 addresses - # whereas ifconfig does not. Add proper global/host scope to output. - new_output = new_output.replace('| . | 50:7b', '| global | 50:7b') - new_output = new_output.replace( - '255.0.0.0 | . |', '255.0.0.0 | host |') - self.assertEqual(new_output, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_warn_on_missing_commands(self, m_subp, m_which): - """netdev_pformat warns when missing both ip and 'netstat'.""" - m_which.return_value = None # Niether ip nor netstat found - content = netdev_pformat() - self.assertEqual('\n', content) - self.assertEqual( - "WARNING: Could not print networks: missing 'ip' and 'ifconfig'" - " commands\n", - self.logs.getvalue()) - m_subp.assert_not_called() - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_info_nettools_down(self, m_subp, m_which): - """test netdev_info using nettools and down interfaces.""" - m_subp.return_value = ( - readResource("netinfo/new-ifconfig-output-down"), "") - m_which.side_effect = lambda x: x if x == 'ifconfig' else None - self.assertEqual( - {'eth0': {'ipv4': [], 'ipv6': [], - 'hwaddr': '00:16:3e:de:51:a6', 'up': False}, - 'lo': {'ipv4': [{'ip': '127.0.0.1', 'mask': '255.0.0.0'}], - 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}], - 'hwaddr': '.', 'up': True}}, - netdev_info(".")) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_netdev_info_iproute_down(self, m_subp, m_which): - """Test netdev_info with ip and down interfaces.""" - m_subp.return_value = ( - readResource("netinfo/sample-ipaddrshow-output-down"), "") - m_which.side_effect = lambda x: x if x == 'ip' else None - self.assertEqual( - {'lo': {'ipv4': [{'ip': '127.0.0.1', 'bcast': '.', - 'mask': '255.0.0.0', 'scope': 'host'}], - 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}], - 'hwaddr': '.', 'up': True}, - 'eth0': {'ipv4': [], 'ipv6': [], - 'hwaddr': '00:16:3e:de:51:a6', 'up': False}}, - netdev_info(".")) - - @mock.patch('cloudinit.netinfo.netdev_info') - def test_netdev_pformat_with_down(self, m_netdev_info): - """test netdev_pformat when netdev_info returns 'down' interfaces.""" - m_netdev_info.return_value = ( - {'lo': {'ipv4': [{'ip': '127.0.0.1', 'mask': '255.0.0.0', - 'scope': 'host'}], - 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}], - 'hwaddr': '.', 'up': True}, - 'eth0': {'ipv4': [], 'ipv6': [], - 'hwaddr': '00:16:3e:de:51:a6', 'up': False}}) - self.assertEqual( - readResource("netinfo/netdev-formatted-output-down"), - netdev_pformat()) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_route_nettools_pformat(self, m_subp, m_which): - """route_pformat properly rendering nettools route info.""" - - def subp_netstat_route_selector(*args, **kwargs): - if args[0] == ['netstat', '--route', '--numeric', '--extend']: - return (SAMPLE_ROUTE_OUT_V4, '') - if args[0] == ['netstat', '-A', 'inet6', '--route', '--numeric']: - return (SAMPLE_ROUTE_OUT_V6, '') - raise Exception('Unexpected subp call %s' % args[0]) - - m_subp.side_effect = subp_netstat_route_selector - m_which.side_effect = lambda x: x if x == 'netstat' else None - content = route_pformat() - self.assertEqual(ROUTE_FORMATTED_OUT, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_route_iproute_pformat(self, m_subp, m_which): - """route_pformat properly rendering ip route info.""" - - def subp_iproute_selector(*args, **kwargs): - if ['ip', '-o', 'route', 'list'] == args[0]: - return (SAMPLE_IPROUTE_OUT_V4, '') - v6cmd = ['ip', '--oneline', '-6', 'route', 'list', 'table', 'all'] - if v6cmd == args[0]: - return (SAMPLE_IPROUTE_OUT_V6, '') - raise Exception('Unexpected subp call %s' % args[0]) - - m_subp.side_effect = subp_iproute_selector - m_which.side_effect = lambda x: x if x == 'ip' else None - content = route_pformat() - self.assertEqual(ROUTE_FORMATTED_OUT, content) - - @mock.patch('cloudinit.netinfo.subp.which') - @mock.patch('cloudinit.netinfo.subp.subp') - def test_route_warn_on_missing_commands(self, m_subp, m_which): - """route_pformat warns when missing both ip and 'netstat'.""" - m_which.return_value = None # Niether ip nor netstat found - content = route_pformat() - self.assertEqual('\n', content) - self.assertEqual( - "WARNING: Could not print routes: missing 'ip' and 'netstat'" - " commands\n", - self.logs.getvalue()) - m_subp.assert_not_called() - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_persistence.py b/cloudinit/tests/test_persistence.py deleted file mode 100644 index ec1152a9..00000000 --- a/cloudinit/tests/test_persistence.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (C) 2020 Canonical Ltd. -# -# Author: Daniel Watkins <oddbloke@ubuntu.com> -# -# This file is part of cloud-init. See LICENSE file for license information. -""" -Tests for cloudinit.persistence. - -Per https://docs.python.org/3/library/pickle.html, only "classes that are -defined at the top level of a module" can be pickled. This means that all of -our ``CloudInitPickleMixin`` subclasses for testing must be defined at -module-level (rather than being defined inline or dynamically in the body of -test methods, as we would do without this constraint). - -``TestPickleMixin.test_subclasses`` iterates over a list of all of these -classes, and tests that they round-trip through a pickle dump/load. As the -interface we're testing is that ``_unpickle`` is called appropriately on -subclasses, our subclasses define their assertions in their ``_unpickle`` -implementation. (This means that the assertions will not be executed if -``_unpickle`` is not called at all; we have -``TestPickleMixin.test_unpickle_called`` to ensure it is called.) - -To avoid manually maintaining a list of classes for parametrization we use a -simple metaclass, ``_Collector``, to gather them up. -""" - -import pickle -from unittest import mock - -import pytest - -from cloudinit.persistence import CloudInitPickleMixin - - -class _Collector(type): - """Any class using this as a metaclass will be stored in test_classes.""" - - test_classes = [] - - def __new__(cls, *args): - new_cls = super().__new__(cls, *args) - _Collector.test_classes.append(new_cls) - return new_cls - - -class InstanceVersionNotUsed(CloudInitPickleMixin, metaclass=_Collector): - """Test that the class version is used over one set in instance state.""" - - _ci_pkl_version = 1 - - def __init__(self): - self._ci_pkl_version = 2 - - def _unpickle(self, ci_pkl_version: int) -> None: - assert 1 == ci_pkl_version - - -class MissingVersionHandled(CloudInitPickleMixin, metaclass=_Collector): - """Test that pickles without ``_ci_pkl_version`` are handled gracefully. - - This is tested by overriding ``__getstate__`` so the dumped pickle of this - class will not have ``_ci_pkl_version`` included. - """ - - def __getstate__(self): - return self.__dict__ - - def _unpickle(self, ci_pkl_version: int) -> None: - assert 0 == ci_pkl_version - - -class OverridenVersionHonored(CloudInitPickleMixin, metaclass=_Collector): - """Test that the subclass's version is used.""" - - _ci_pkl_version = 1 - - def _unpickle(self, ci_pkl_version: int) -> None: - assert 1 == ci_pkl_version - - -class StateIsRestored(CloudInitPickleMixin, metaclass=_Collector): - """Instance state should be restored before ``_unpickle`` is called.""" - - def __init__(self): - self.some_state = "some state" - - def _unpickle(self, ci_pkl_version: int) -> None: - assert "some state" == self.some_state - - -class UnpickleCanBeUnoverriden(CloudInitPickleMixin, metaclass=_Collector): - """Subclasses should not need to override ``_unpickle``.""" - - -class VersionDefaultsToZero(CloudInitPickleMixin, metaclass=_Collector): - """Test that the default version is 0.""" - - def _unpickle(self, ci_pkl_version: int) -> None: - assert 0 == ci_pkl_version - - -class VersionIsPoppedFromState(CloudInitPickleMixin, metaclass=_Collector): - """Test _ci_pkl_version is popped from state before being restored.""" - - def _unpickle(self, ci_pkl_version: int) -> None: - # `self._ci_pkl_version` returns the type's _ci_pkl_version if it isn't - # in instance state, so we need to explicitly check self.__dict__. - assert "_ci_pkl_version" not in self.__dict__ - - -class TestPickleMixin: - def test_unpickle_called(self): - """Test that self._unpickle is called on unpickle.""" - with mock.patch.object( - CloudInitPickleMixin, "_unpickle" - ) as m_unpickle: - pickle.loads(pickle.dumps(CloudInitPickleMixin())) - assert 1 == m_unpickle.call_count - - @pytest.mark.parametrize("cls", _Collector.test_classes) - def test_subclasses(self, cls): - """For each collected class, round-trip through pickle dump/load. - - Assertions are implemented in ``cls._unpickle``, and so are evoked as - part of the pickle load. - """ - pickle.loads(pickle.dumps(cls())) diff --git a/cloudinit/tests/test_simpletable.py b/cloudinit/tests/test_simpletable.py deleted file mode 100644 index a12a62a0..00000000 --- a/cloudinit/tests/test_simpletable.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright (C) 2017 Amazon.com, Inc. or its affiliates -# -# Author: Andrew Jorgensen <ajorgens@amazon.com> -# -# This file is part of cloud-init. See LICENSE file for license information. -"""Tests that SimpleTable works just like PrettyTable for cloud-init. - -Not all possible PrettyTable cases are tested because we're not trying to -reimplement the entire library, only the minimal parts we actually use. -""" - -from cloudinit.simpletable import SimpleTable -from cloudinit.tests.helpers import CiTestCase - -# Examples rendered by cloud-init using PrettyTable -NET_DEVICE_FIELDS = ( - 'Device', 'Up', 'Address', 'Mask', 'Scope', 'Hw-Address') -NET_DEVICE_ROWS = ( - ('ens3', True, '172.31.4.203', '255.255.240.0', '.', '0a:1f:07:15:98:70'), - ('ens3', True, 'fe80::81f:7ff:fe15:9870/64', '.', 'link', - '0a:1f:07:15:98:70'), - ('lo', True, '127.0.0.1', '255.0.0.0', '.', '.'), - ('lo', True, '::1/128', '.', 'host', '.'), -) -NET_DEVICE_TABLE = """\ -+--------+------+----------------------------+---------------+-------+-------------------+ -| Device | Up | Address | Mask | Scope | Hw-Address | -+--------+------+----------------------------+---------------+-------+-------------------+ -| ens3 | True | 172.31.4.203 | 255.255.240.0 | . | 0a:1f:07:15:98:70 | -| ens3 | True | fe80::81f:7ff:fe15:9870/64 | . | link | 0a:1f:07:15:98:70 | -| lo | True | 127.0.0.1 | 255.0.0.0 | . | . | -| lo | True | ::1/128 | . | host | . | -+--------+------+----------------------------+---------------+-------+-------------------+""" # noqa: E501 -ROUTE_IPV4_FIELDS = ( - 'Route', 'Destination', 'Gateway', 'Genmask', 'Interface', 'Flags') -ROUTE_IPV4_ROWS = ( - ('0', '0.0.0.0', '172.31.0.1', '0.0.0.0', 'ens3', 'UG'), - ('1', '169.254.0.0', '0.0.0.0', '255.255.0.0', 'ens3', 'U'), - ('2', '172.31.0.0', '0.0.0.0', '255.255.240.0', 'ens3', 'U'), -) -ROUTE_IPV4_TABLE = """\ -+-------+-------------+------------+---------------+-----------+-------+ -| Route | Destination | Gateway | Genmask | Interface | Flags | -+-------+-------------+------------+---------------+-----------+-------+ -| 0 | 0.0.0.0 | 172.31.0.1 | 0.0.0.0 | ens3 | UG | -| 1 | 169.254.0.0 | 0.0.0.0 | 255.255.0.0 | ens3 | U | -| 2 | 172.31.0.0 | 0.0.0.0 | 255.255.240.0 | ens3 | U | -+-------+-------------+------------+---------------+-----------+-------+""" - -AUTHORIZED_KEYS_FIELDS = ( - 'Keytype', 'Fingerprint (md5)', 'Options', 'Comment') -AUTHORIZED_KEYS_ROWS = ( - ('ssh-rsa', '24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36', '-', - 'ajorgens'), -) -AUTHORIZED_KEYS_TABLE = """\ -+---------+-------------------------------------------------+---------+----------+ -| Keytype | Fingerprint (md5) | Options | Comment | -+---------+-------------------------------------------------+---------+----------+ -| ssh-rsa | 24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36 | - | ajorgens | -+---------+-------------------------------------------------+---------+----------+""" # noqa: E501 - -# from prettytable import PrettyTable -# pt = PrettyTable(('HEADER',)) -# print(pt) -NO_ROWS_FIELDS = ('HEADER',) -NO_ROWS_TABLE = """\ -+--------+ -| HEADER | -+--------+ -+--------+""" - - -class TestSimpleTable(CiTestCase): - - def test_no_rows(self): - """An empty table is rendered as PrettyTable would have done it.""" - table = SimpleTable(NO_ROWS_FIELDS) - self.assertEqual(str(table), NO_ROWS_TABLE) - - def test_net_dev(self): - """Net device info is rendered as it was with PrettyTable.""" - table = SimpleTable(NET_DEVICE_FIELDS) - for row in NET_DEVICE_ROWS: - table.add_row(row) - self.assertEqual(str(table), NET_DEVICE_TABLE) - - def test_route_ipv4(self): - """Route IPv4 info is rendered as it was with PrettyTable.""" - table = SimpleTable(ROUTE_IPV4_FIELDS) - for row in ROUTE_IPV4_ROWS: - table.add_row(row) - self.assertEqual(str(table), ROUTE_IPV4_TABLE) - - def test_authorized_keys(self): - """SSH authorized keys are rendered as they were with PrettyTable.""" - table = SimpleTable(AUTHORIZED_KEYS_FIELDS) - for row in AUTHORIZED_KEYS_ROWS: - table.add_row(row) - - def test_get_string(self): - """get_string() method returns the same content as str().""" - table = SimpleTable(AUTHORIZED_KEYS_FIELDS) - for row in AUTHORIZED_KEYS_ROWS: - table.add_row(row) - self.assertEqual(table.get_string(), str(table)) diff --git a/cloudinit/tests/test_stages.py b/cloudinit/tests/test_stages.py deleted file mode 100644 index d2d1b37f..00000000 --- a/cloudinit/tests/test_stages.py +++ /dev/null @@ -1,406 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests related to cloudinit.stages module.""" - -import os -import stat - -import pytest - -from cloudinit import stages -from cloudinit import sources -from cloudinit.sources import NetworkConfigSource - -from cloudinit.event import EventType -from cloudinit.util import write_file - -from cloudinit.tests.helpers import CiTestCase, mock - -TEST_INSTANCE_ID = 'i-testing' - - -class FakeDataSource(sources.DataSource): - - def __init__(self, paths=None, userdata=None, vendordata=None, - network_config=''): - super(FakeDataSource, self).__init__({}, None, paths=paths) - self.metadata = {'instance-id': TEST_INSTANCE_ID} - self.userdata_raw = userdata - self.vendordata_raw = vendordata - self._network_config = None - if network_config: # Permit for None value to setup attribute - self._network_config = network_config - - @property - def network_config(self): - return self._network_config - - def _get_data(self): - return True - - -class TestInit(CiTestCase): - with_logs = True - allowed_subp = False - - def setUp(self): - super(TestInit, self).setUp() - self.tmpdir = self.tmp_dir() - self.init = stages.Init() - # Setup fake Paths for Init to reference - self.init._cfg = {'system_info': { - 'distro': 'ubuntu', 'paths': {'cloud_dir': self.tmpdir, - 'run_dir': self.tmpdir}}} - self.init.datasource = FakeDataSource(paths=self.init.paths) - - def test_wb__find_networking_config_disabled(self): - """find_networking_config returns no config when disabled.""" - disable_file = os.path.join( - self.init.paths.get_cpath('data'), 'upgraded-network') - write_file(disable_file, '') - self.assertEqual( - (None, disable_file), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_disabled_by_kernel( - self, m_cmdline, m_initramfs): - """find_networking_config returns when disabled by kernel cmdline.""" - m_cmdline.return_value = {'config': 'disabled'} - m_initramfs.return_value = {'config': ['fake_initrd']} - self.assertEqual( - (None, NetworkConfigSource.cmdline), - self.init._find_networking_config()) - self.assertEqual('DEBUG: network config disabled by cmdline\n', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_disabled_by_initrd( - self, m_cmdline, m_initramfs): - """find_networking_config returns when disabled by kernel cmdline.""" - m_cmdline.return_value = {} - m_initramfs.return_value = {'config': 'disabled'} - self.assertEqual( - (None, NetworkConfigSource.initramfs), - self.init._find_networking_config()) - self.assertEqual('DEBUG: network config disabled by initramfs\n', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_disabled_by_datasrc( - self, m_cmdline, m_initramfs): - """find_networking_config returns when disabled by datasource cfg.""" - m_cmdline.return_value = {} # Kernel doesn't disable networking - m_initramfs.return_value = {} # initramfs doesn't disable networking - self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}}, - 'network': {}} # system config doesn't disable - - self.init.datasource = FakeDataSource( - network_config={'config': 'disabled'}) - self.assertEqual( - (None, NetworkConfigSource.ds), - self.init._find_networking_config()) - self.assertEqual('DEBUG: network config disabled by ds\n', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_disabled_by_sysconfig( - self, m_cmdline, m_initramfs): - """find_networking_config returns when disabled by system config.""" - m_cmdline.return_value = {} # Kernel doesn't disable networking - m_initramfs.return_value = {} # initramfs doesn't disable networking - self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}}, - 'network': {'config': 'disabled'}} - self.assertEqual( - (None, NetworkConfigSource.system_cfg), - self.init._find_networking_config()) - self.assertEqual('DEBUG: network config disabled by system_cfg\n', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test__find_networking_config_uses_datasrc_order( - self, m_cmdline, m_initramfs): - """find_networking_config should check sources in DS defined order""" - # cmdline and initramfs, which would normally be preferred over other - # sources, disable networking; in this case, though, the DS moves them - # later so its own config is preferred - m_cmdline.return_value = {'config': 'disabled'} - m_initramfs.return_value = {'config': 'disabled'} - - ds_net_cfg = {'config': {'needle': True}} - self.init.datasource = FakeDataSource(network_config=ds_net_cfg) - self.init.datasource.network_config_sources = [ - NetworkConfigSource.ds, NetworkConfigSource.system_cfg, - NetworkConfigSource.cmdline, NetworkConfigSource.initramfs] - - self.assertEqual( - (ds_net_cfg, NetworkConfigSource.ds), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test__find_networking_config_warns_if_datasrc_uses_invalid_src( - self, m_cmdline, m_initramfs): - """find_networking_config should check sources in DS defined order""" - ds_net_cfg = {'config': {'needle': True}} - self.init.datasource = FakeDataSource(network_config=ds_net_cfg) - self.init.datasource.network_config_sources = [ - 'invalid_src', NetworkConfigSource.ds] - - self.assertEqual( - (ds_net_cfg, NetworkConfigSource.ds), - self.init._find_networking_config()) - self.assertIn('WARNING: data source specifies an invalid network' - ' cfg_source: invalid_src', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test__find_networking_config_warns_if_datasrc_uses_unavailable_src( - self, m_cmdline, m_initramfs): - """find_networking_config should check sources in DS defined order""" - ds_net_cfg = {'config': {'needle': True}} - self.init.datasource = FakeDataSource(network_config=ds_net_cfg) - self.init.datasource.network_config_sources = [ - NetworkConfigSource.fallback, NetworkConfigSource.ds] - - self.assertEqual( - (ds_net_cfg, NetworkConfigSource.ds), - self.init._find_networking_config()) - self.assertIn('WARNING: data source specifies an unavailable network' - ' cfg_source: fallback', - self.logs.getvalue()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_returns_kernel( - self, m_cmdline, m_initramfs): - """find_networking_config returns kernel cmdline config if present.""" - expected_cfg = {'config': ['fakekernel']} - m_cmdline.return_value = expected_cfg - m_initramfs.return_value = {'config': ['fake_initrd']} - self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}}, - 'network': {'config': ['fakesys_config']}} - self.init.datasource = FakeDataSource( - network_config={'config': ['fakedatasource']}) - self.assertEqual( - (expected_cfg, NetworkConfigSource.cmdline), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_returns_initramfs( - self, m_cmdline, m_initramfs): - """find_networking_config returns kernel cmdline config if present.""" - expected_cfg = {'config': ['fake_initrd']} - m_cmdline.return_value = {} - m_initramfs.return_value = expected_cfg - self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}}, - 'network': {'config': ['fakesys_config']}} - self.init.datasource = FakeDataSource( - network_config={'config': ['fakedatasource']}) - self.assertEqual( - (expected_cfg, NetworkConfigSource.initramfs), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_returns_system_cfg( - self, m_cmdline, m_initramfs): - """find_networking_config returns system config when present.""" - m_cmdline.return_value = {} # No kernel network config - m_initramfs.return_value = {} # no initramfs network config - expected_cfg = {'config': ['fakesys_config']} - self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}}, - 'network': expected_cfg} - self.init.datasource = FakeDataSource( - network_config={'config': ['fakedatasource']}) - self.assertEqual( - (expected_cfg, NetworkConfigSource.system_cfg), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_returns_datasrc_cfg( - self, m_cmdline, m_initramfs): - """find_networking_config returns datasource net config if present.""" - m_cmdline.return_value = {} # No kernel network config - m_initramfs.return_value = {} # no initramfs network config - # No system config for network in setUp - expected_cfg = {'config': ['fakedatasource']} - self.init.datasource = FakeDataSource(network_config=expected_cfg) - self.assertEqual( - (expected_cfg, NetworkConfigSource.ds), - self.init._find_networking_config()) - - @mock.patch('cloudinit.stages.cmdline.read_initramfs_config') - @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config') - def test_wb__find_networking_config_returns_fallback( - self, m_cmdline, m_initramfs): - """find_networking_config returns fallback config if not defined.""" - m_cmdline.return_value = {} # Kernel doesn't disable networking - m_initramfs.return_value = {} # no initramfs network config - # Neither datasource nor system_info disable or provide network - - fake_cfg = {'config': [{'type': 'physical', 'name': 'eth9'}], - 'version': 1} - - def fake_generate_fallback(): - return fake_cfg - - # Monkey patch distro which gets cached on self.init - distro = self.init.distro - distro.generate_fallback_config = fake_generate_fallback - self.assertEqual( - (fake_cfg, NetworkConfigSource.fallback), - self.init._find_networking_config()) - self.assertNotIn('network config disabled', self.logs.getvalue()) - - def test_apply_network_config_disabled(self): - """Log when network is disabled by upgraded-network.""" - disable_file = os.path.join( - self.init.paths.get_cpath('data'), 'upgraded-network') - - def fake_network_config(): - return (None, disable_file) - - self.init._find_networking_config = fake_network_config - - self.init.apply_network_config(True) - self.assertIn( - 'INFO: network config is disabled by %s' % disable_file, - self.logs.getvalue()) - - @mock.patch('cloudinit.net.get_interfaces_by_mac') - @mock.patch('cloudinit.distros.ubuntu.Distro') - def test_apply_network_on_new_instance(self, m_ubuntu, m_macs): - """Call distro apply_network_config methods on is_new_instance.""" - net_cfg = { - 'version': 1, 'config': [ - {'subnets': [{'type': 'dhcp'}], 'type': 'physical', - 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]} - - def fake_network_config(): - return net_cfg, NetworkConfigSource.fallback - - m_macs.return_value = {'42:42:42:42:42:42': 'eth9'} - - self.init._find_networking_config = fake_network_config - self.init.apply_network_config(True) - self.init.distro.apply_network_config_names.assert_called_with(net_cfg) - self.init.distro.apply_network_config.assert_called_with( - net_cfg, bring_up=True) - - @mock.patch('cloudinit.distros.ubuntu.Distro') - def test_apply_network_on_same_instance_id(self, m_ubuntu): - """Only call distro.apply_network_config_names on same instance id.""" - old_instance_id = os.path.join( - self.init.paths.get_cpath('data'), 'instance-id') - write_file(old_instance_id, TEST_INSTANCE_ID) - net_cfg = { - 'version': 1, 'config': [ - {'subnets': [{'type': 'dhcp'}], 'type': 'physical', - 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]} - - def fake_network_config(): - return net_cfg, NetworkConfigSource.fallback - - self.init._find_networking_config = fake_network_config - self.init.apply_network_config(True) - self.init.distro.apply_network_config_names.assert_called_with(net_cfg) - self.init.distro.apply_network_config.assert_not_called() - self.assertIn( - 'No network config applied. Neither a new instance' - " nor datasource network update on '%s' event" % EventType.BOOT, - self.logs.getvalue()) - - @mock.patch('cloudinit.net.get_interfaces_by_mac') - @mock.patch('cloudinit.distros.ubuntu.Distro') - def test_apply_network_on_datasource_allowed_event(self, m_ubuntu, m_macs): - """Apply network if datasource.update_metadata permits BOOT event.""" - old_instance_id = os.path.join( - self.init.paths.get_cpath('data'), 'instance-id') - write_file(old_instance_id, TEST_INSTANCE_ID) - net_cfg = { - 'version': 1, 'config': [ - {'subnets': [{'type': 'dhcp'}], 'type': 'physical', - 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]} - - def fake_network_config(): - return net_cfg, NetworkConfigSource.fallback - - m_macs.return_value = {'42:42:42:42:42:42': 'eth9'} - - self.init._find_networking_config = fake_network_config - self.init.datasource = FakeDataSource(paths=self.init.paths) - self.init.datasource.update_events = {'network': [EventType.BOOT]} - self.init.apply_network_config(True) - self.init.distro.apply_network_config_names.assert_called_with(net_cfg) - self.init.distro.apply_network_config.assert_called_with( - net_cfg, bring_up=True) - - -class TestInit_InitializeFilesystem: - """Tests for cloudinit.stages.Init._initialize_filesystem. - - TODO: Expand these tests to cover all of _initialize_filesystem's behavior. - """ - - @pytest.yield_fixture - def init(self, paths): - """A fixture which yields a stages.Init instance with paths and cfg set - - As it is replaced with a mock, consumers of this fixture can set - `init.cfg` if the default empty dict configuration is not appropriate. - """ - with mock.patch( - "cloudinit.stages.Init.cfg", mock.PropertyMock(return_value={}) - ): - with mock.patch("cloudinit.stages.util.ensure_dirs"): - init = stages.Init() - init._paths = paths - yield init - - @mock.patch("cloudinit.stages.util.ensure_file") - def test_ensure_file_not_called_if_no_log_file_configured( - self, m_ensure_file, init - ): - """If no log file is configured, we should not ensure its existence.""" - init.cfg = {} - - init._initialize_filesystem() - - assert 0 == m_ensure_file.call_count - - def test_log_files_existence_is_ensured_if_configured(self, init, tmpdir): - """If a log file is configured, we should ensure its existence.""" - log_file = tmpdir.join("cloud-init.log") - init.cfg = {"def_log_file": str(log_file)} - - init._initialize_filesystem() - - assert log_file.exists - - def test_existing_file_permissions_are_not_modified(self, init, tmpdir): - """If the log file already exists, we should not modify its permissions - - See https://bugs.launchpad.net/cloud-init/+bug/1900837. - """ - # Use a mode that will never be made the default so this test will - # always be valid - mode = 0o606 - log_file = tmpdir.join("cloud-init.log") - log_file.ensure() - log_file.chmod(mode) - init.cfg = {"def_log_file": str(log_file)} - - init._initialize_filesystem() - - assert mode == stat.S_IMODE(log_file.stat().mode) - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py deleted file mode 100644 index 911c1f3d..00000000 --- a/cloudinit/tests/test_subp.py +++ /dev/null @@ -1,286 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests for cloudinit.subp utility functions""" - -import json -import os -import sys -import stat - -from unittest import mock - -from cloudinit import subp, util -from cloudinit.tests.helpers import CiTestCase - - -BASH = subp.which('bash') -BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name' - - -class TestPrependBaseCommands(CiTestCase): - - with_logs = True - - def test_prepend_base_command_errors_on_neither_string_nor_list(self): - """Raise an error for each command which is not a string or list.""" - orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']] - with self.assertRaises(TypeError) as context_manager: - subp.prepend_base_command( - base_command='basecmd', commands=orig_commands) - self.assertEqual( - "Invalid basecmd config. These commands are not a string or" - " list:\n1\n{'not': 'gonna work'}", - str(context_manager.exception)) - - def test_prepend_base_command_warns_on_non_base_string_commands(self): - """Warn on each non-base for commands of type string.""" - orig_commands = [ - 'ls', 'basecmd list', 'touch /blah', 'basecmd install x'] - fixed_commands = subp.prepend_base_command( - base_command='basecmd', commands=orig_commands) - self.assertEqual( - 'WARNING: Non-basecmd commands in basecmd config:\n' - 'ls\ntouch /blah\n', - self.logs.getvalue()) - self.assertEqual(orig_commands, fixed_commands) - - def test_prepend_base_command_prepends_on_non_base_list_commands(self): - """Prepend 'basecmd' for each non-basecmd command of type list.""" - orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'], - ['basecmd', 'install', 'x']] - expected = [['basecmd', 'ls'], ['basecmd', 'list'], - ['basecmd', 'basecmda', '/blah'], - ['basecmd', 'install', 'x']] - fixed_commands = subp.prepend_base_command( - base_command='basecmd', commands=orig_commands) - self.assertEqual('', self.logs.getvalue()) - self.assertEqual(expected, fixed_commands) - - def test_prepend_base_command_removes_first_item_when_none(self): - """Remove the first element of a non-basecmd when it is None.""" - orig_commands = [[None, 'ls'], ['basecmd', 'list'], - [None, 'touch', '/blah'], - ['basecmd', 'install', 'x']] - expected = [['ls'], ['basecmd', 'list'], - ['touch', '/blah'], - ['basecmd', 'install', 'x']] - fixed_commands = subp.prepend_base_command( - base_command='basecmd', commands=orig_commands) - self.assertEqual('', self.logs.getvalue()) - self.assertEqual(expected, fixed_commands) - - -class TestSubp(CiTestCase): - allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE, - BOGUS_COMMAND, sys.executable] - - stdin2err = [BASH, '-c', 'cat >&2'] - stdin2out = ['cat'] - utf8_invalid = b'ab\xaadef' - utf8_valid = b'start \xc3\xa9 end' - utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' - printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--'] - - def printf_cmd(self, *args): - # bash's printf supports \xaa. So does /usr/bin/printf - # but by using bash, we remove dependency on another program. - return([BASH, '-c', 'printf "$@"', 'printf'] + list(args)) - - def test_subp_handles_bytestrings(self): - """subp can run a bytestring command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_strings(self): - """subp can run a string command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = subp.subp(cmd, shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_utf8(self): - # The given bytes contain utf-8 accented characters as seen in e.g. - # the "deja dup" package in Ubuntu. - cmd = self.printf_cmd(self.utf8_valid_2) - (out, _err) = subp.subp(cmd, capture=True) - self.assertEqual(out, self.utf8_valid_2.decode('utf-8')) - - def test_subp_respects_decode_false(self): - (out, err) = subp.subp(self.stdin2out, capture=True, decode=False, - data=self.utf8_valid) - self.assertTrue(isinstance(out, bytes)) - self.assertTrue(isinstance(err, bytes)) - self.assertEqual(out, self.utf8_valid) - - def test_subp_decode_ignore(self): - # this executes a string that writes invalid utf-8 to stdout - (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'), - capture=True, decode='ignore') - self.assertEqual(out, 'abcdef') - - def test_subp_decode_strict_valid_utf8(self): - (out, _err) = subp.subp(self.stdin2out, capture=True, - decode='strict', data=self.utf8_valid) - self.assertEqual(out, self.utf8_valid.decode('utf-8')) - - def test_subp_decode_invalid_utf8_replaces(self): - (out, _err) = subp.subp(self.stdin2out, capture=True, - data=self.utf8_invalid) - expected = self.utf8_invalid.decode('utf-8', 'replace') - self.assertEqual(out, expected) - - def test_subp_decode_strict_raises(self): - args = [] - kwargs = {'args': self.stdin2out, 'capture': True, - 'decode': 'strict', 'data': self.utf8_invalid} - self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs) - - def test_subp_capture_stderr(self): - data = b'hello world' - (out, err) = subp.subp(self.stdin2err, capture=True, - decode=False, data=data, - update_env={'LC_ALL': 'C'}) - self.assertEqual(err, data) - self.assertEqual(out, b'') - - def test_subp_reads_env(self): - with mock.patch.dict("os.environ", values={'FOO': 'BAR'}): - out, _err = subp.subp(self.printenv + ['FOO'], capture=True) - self.assertEqual('FOO=BAR', out.splitlines()[0]) - - def test_subp_env_and_update_env(self): - out, _err = subp.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - env={'FOO': 'BAR'}, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines()) - - def test_subp_update_env(self): - extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'} - with mock.patch.dict("os.environ", values=extra): - out, _err = subp.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines()) - - def test_subp_warn_missing_shebang(self): - """Warn on no #! in script""" - noshebang = self.tmp_path('noshebang') - util.write_file(noshebang, 'true\n') - - print("os is %s" % os) - os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) - with self.allow_subp([noshebang]): - self.assertRaisesRegex(subp.ProcessExecutionError, - r'Missing #! in script\?', - subp.subp, (noshebang,)) - - def test_subp_combined_stderr_stdout(self): - """Providing combine_capture as True redirects stderr to stdout.""" - data = b'hello world' - (out, err) = subp.subp(self.stdin2err, capture=True, - combine_capture=True, decode=False, data=data) - self.assertEqual(b'', err) - self.assertEqual(data, out) - - def test_returns_none_if_no_capture(self): - (out, err) = subp.subp(self.stdin2out, data=b'', capture=False) - self.assertIsNone(err) - self.assertIsNone(out) - - def test_exception_has_out_err_are_bytes_if_decode_false(self): - """Raised exc should have stderr, stdout as bytes if no decode.""" - with self.assertRaises(subp.ProcessExecutionError) as cm: - subp.subp([BOGUS_COMMAND], decode=False) - self.assertTrue(isinstance(cm.exception.stdout, bytes)) - self.assertTrue(isinstance(cm.exception.stderr, bytes)) - - def test_exception_has_out_err_are_bytes_if_decode_true(self): - """Raised exc should have stderr, stdout as string if no decode.""" - with self.assertRaises(subp.ProcessExecutionError) as cm: - subp.subp([BOGUS_COMMAND], decode=True) - self.assertTrue(isinstance(cm.exception.stdout, str)) - self.assertTrue(isinstance(cm.exception.stderr, str)) - - def test_bunch_of_slashes_in_path(self): - self.assertEqual("/target/my/path/", - subp.target_path("/target/", "//my/path/")) - self.assertEqual("/target/my/path/", - subp.target_path("/target/", "///my/path/")) - - def test_c_lang_can_take_utf8_args(self): - """Independent of system LC_CTYPE, args can contain utf-8 strings. - - When python starts up, its default encoding gets set based on - the value of LC_CTYPE. If no system locale is set, the default - encoding for both python2 and python3 in some paths will end up - being ascii. - - Attempts to use setlocale or patching (or changing) os.environ - in the current environment seem to not be effective. - - This test starts up a python with LC_CTYPE set to C so that - the default encoding will be set to ascii. In such an environment - Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError. - """ - python_prog = '\n'.join([ - 'import json, sys', - 'from cloudinit.subp import subp', - 'data = sys.stdin.read()', - 'cmd = json.loads(data)', - 'subp(cmd, capture=False)', - '']) - cmd = [BASH, '-c', 'echo -n "$@"', '--', - self.utf8_valid.decode("utf-8")] - python_subp = [sys.executable, '-c', python_prog] - - out, _err = subp.subp( - python_subp, update_env={'LC_CTYPE': 'C'}, - data=json.dumps(cmd).encode("utf-8"), - decode=False) - self.assertEqual(self.utf8_valid, out) - - def test_bogus_command_logs_status_messages(self): - """status_cb gets status messages logs on bogus commands provided.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(subp.ProcessExecutionError): - subp.subp([BOGUS_COMMAND], status_cb=status_cb) - - expected = [ - 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND), - 'ERROR: End run command: invalid command provided\n'] - self.assertEqual(expected, logs) - - def test_command_logs_exit_codes_to_status_cb(self): - """status_cb gets status messages containing command exit code.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(subp.ProcessExecutionError): - subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb) - subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb) - - expected = [ - 'Begin run command: %s -c exit 2\n' % BASH, - 'ERROR: End run command: exit(2)\n', - 'Begin run command: %s -c exit 0\n' % BASH, - 'End run command: exit(0)\n'] - self.assertEqual(expected, logs) - - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_temp_utils.py b/cloudinit/tests/test_temp_utils.py deleted file mode 100644 index 4a52ef89..00000000 --- a/cloudinit/tests/test_temp_utils.py +++ /dev/null @@ -1,117 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests for cloudinit.temp_utils""" - -from cloudinit.temp_utils import mkdtemp, mkstemp, tempdir -from cloudinit.tests.helpers import CiTestCase, wrap_and_call -import os - - -class TestTempUtils(CiTestCase): - - def test_mkdtemp_default_non_root(self): - """mkdtemp creates a dir under /tmp for the unprivileged.""" - calls = [] - - def fake_mkdtemp(*args, **kwargs): - calls.append(kwargs) - return '/fake/return/path' - - retval = wrap_and_call( - 'cloudinit.temp_utils', - {'os.getuid': 1000, - 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, - '_TMPDIR': {'new': None}, - 'os.path.isdir': True}, - mkdtemp) - self.assertEqual('/fake/return/path', retval) - self.assertEqual([{'dir': '/tmp'}], calls) - - def test_mkdtemp_default_non_root_needs_exe(self): - """mkdtemp creates a dir under /var/tmp/cloud-init when needs_exe.""" - calls = [] - - def fake_mkdtemp(*args, **kwargs): - calls.append(kwargs) - return '/fake/return/path' - - retval = wrap_and_call( - 'cloudinit.temp_utils', - {'os.getuid': 1000, - 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, - '_TMPDIR': {'new': None}, - 'os.path.isdir': True}, - mkdtemp, needs_exe=True) - self.assertEqual('/fake/return/path', retval) - self.assertEqual([{'dir': '/var/tmp/cloud-init'}], calls) - - def test_mkdtemp_default_root(self): - """mkdtemp creates a dir under /run/cloud-init for the privileged.""" - calls = [] - - def fake_mkdtemp(*args, **kwargs): - calls.append(kwargs) - return '/fake/return/path' - - retval = wrap_and_call( - 'cloudinit.temp_utils', - {'os.getuid': 0, - 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp}, - '_TMPDIR': {'new': None}, - 'os.path.isdir': True}, - mkdtemp) - self.assertEqual('/fake/return/path', retval) - self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls) - - def test_mkstemp_default_non_root(self): - """mkstemp creates secure tempfile under /tmp for the unprivileged.""" - calls = [] - - def fake_mkstemp(*args, **kwargs): - calls.append(kwargs) - return '/fake/return/path' - - retval = wrap_and_call( - 'cloudinit.temp_utils', - {'os.getuid': 1000, - 'tempfile.mkstemp': {'side_effect': fake_mkstemp}, - '_TMPDIR': {'new': None}, - 'os.path.isdir': True}, - mkstemp) - self.assertEqual('/fake/return/path', retval) - self.assertEqual([{'dir': '/tmp'}], calls) - - def test_mkstemp_default_root(self): - """mkstemp creates a secure tempfile in /run/cloud-init for root.""" - calls = [] - - def fake_mkstemp(*args, **kwargs): - calls.append(kwargs) - return '/fake/return/path' - - retval = wrap_and_call( - 'cloudinit.temp_utils', - {'os.getuid': 0, - 'tempfile.mkstemp': {'side_effect': fake_mkstemp}, - '_TMPDIR': {'new': None}, - 'os.path.isdir': True}, - mkstemp) - self.assertEqual('/fake/return/path', retval) - self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls) - - def test_tempdir_error_suppression(self): - """test tempdir suppresses errors during directory removal.""" - - with self.assertRaises(OSError): - with tempdir(prefix='cloud-init-dhcp-') as tdir: - os.rmdir(tdir) - # As a result, the directory is already gone, - # so shutil.rmtree should raise OSError - - with tempdir(rmtree_ignore_errors=True, - prefix='cloud-init-dhcp-') as tdir: - os.rmdir(tdir) - # Since the directory is already gone, shutil.rmtree would raise - # OSError, but we suppress that - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_upgrade.py b/cloudinit/tests/test_upgrade.py deleted file mode 100644 index f79a2536..00000000 --- a/cloudinit/tests/test_upgrade.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (C) 2020 Canonical Ltd. -# -# Author: Daniel Watkins <oddbloke@ubuntu.com> -# -# This file is part of cloud-init. See LICENSE file for license information. - -"""Upgrade testing for cloud-init. - -This module tests cloud-init's behaviour across upgrades. Specifically, it -specifies a set of invariants that the current codebase expects to be true (as -tests in ``TestUpgrade``) and then checks that these hold true after unpickling -``obj.pkl``s from previous versions of cloud-init; those pickles are stored in -``tests/data/old_pickles/``. -""" - -import operator -import pathlib - -import pytest - -from cloudinit.stages import _pkl_load -from cloudinit.tests.helpers import resourceLocation - - -class TestUpgrade: - @pytest.fixture( - params=pathlib.Path(resourceLocation("old_pickles")).glob("*.pkl"), - scope="class", - ids=operator.attrgetter("name"), - ) - def previous_obj_pkl(self, request): - """Load each pickle to memory once, then run all tests against it. - - Test implementations _must not_ modify the ``previous_obj_pkl`` which - they are passed, as that will affect tests that run after them. - """ - return _pkl_load(str(request.param)) - - def test_networking_set_on_distro(self, previous_obj_pkl): - """We always expect to have ``.networking`` on ``Distro`` objects.""" - assert previous_obj_pkl.distro.networking is not None - - def test_blacklist_drivers_set_on_networking(self, previous_obj_pkl): - """We always expect Networking.blacklist_drivers to be initialised.""" - assert previous_obj_pkl.distro.networking.blacklist_drivers is None diff --git a/cloudinit/tests/test_url_helper.py b/cloudinit/tests/test_url_helper.py deleted file mode 100644 index 364ec822..00000000 --- a/cloudinit/tests/test_url_helper.py +++ /dev/null @@ -1,174 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from cloudinit.url_helper import ( - NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url, - retry_on_url_exc) -from cloudinit.tests.helpers import CiTestCase, mock, skipIf -from cloudinit import util -from cloudinit import version - -import httpretty -import requests - - -try: - import oauthlib - assert oauthlib # avoid pyflakes error F401: import unused - _missing_oauthlib_dep = False -except ImportError: - _missing_oauthlib_dep = True - - -M_PATH = 'cloudinit.url_helper.' - - -class TestOAuthHeaders(CiTestCase): - - def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self): - """oauth_headers raises a NotImplemented error when oauth absent.""" - with mock.patch.dict('sys.modules', {'oauthlib': None}): - with self.assertRaises(NotImplementedError) as context_manager: - oauth_headers(1, 2, 3, 4, 5) - self.assertEqual( - 'oauth support is not available', - str(context_manager.exception)) - - @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency") - @mock.patch('oauthlib.oauth1.Client') - def test_oauth_headers_calls_oathlibclient_when_available(self, m_client): - """oauth_headers calls oaut1.hClient.sign with the provided url.""" - class fakeclient(object): - def sign(self, url): - # The first and 3rd item of the client.sign tuple are ignored - return ('junk', url, 'junk2') - - m_client.return_value = fakeclient() - - return_value = oauth_headers( - 'url', 'consumer_key', 'token_key', 'token_secret', - 'consumer_secret') - self.assertEqual('url', return_value) - - -class TestReadFileOrUrl(CiTestCase): - - with_logs = True - - def test_read_file_or_url_str_from_file(self): - """Test that str(result.contents) on file is text version of contents. - It should not be "b'data'", but just "'data'" """ - tmpf = self.tmp_path("myfile1") - data = b'This is my file content\n' - util.write_file(tmpf, data, omode="wb") - result = read_file_or_url("file://%s" % tmpf) - self.assertEqual(result.contents, data) - self.assertEqual(str(result), data.decode('utf-8')) - - @httpretty.activate - def test_read_file_or_url_str_from_url(self): - """Test that str(result.contents) on url is text version of contents. - It should not be "b'data'", but just "'data'" """ - url = 'http://hostname/path' - data = b'This is my url content\n' - httpretty.register_uri(httpretty.GET, url, data) - result = read_file_or_url(url) - self.assertEqual(result.contents, data) - self.assertEqual(str(result), data.decode('utf-8')) - - @httpretty.activate - def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self): - """Headers are redacted from logs but unredacted in requests.""" - url = 'http://hostname/path' - headers = {'sensitive': 'sekret', 'server': 'blah'} - httpretty.register_uri(httpretty.GET, url) - - read_file_or_url(url, headers=headers, headers_redact=['sensitive']) - logs = self.logs.getvalue() - for k in headers.keys(): - self.assertEqual(headers[k], httpretty.last_request().headers[k]) - self.assertIn(REDACTED, logs) - self.assertNotIn('sekret', logs) - - @httpretty.activate - def test_read_file_or_url_str_from_url_redacts_noheaders(self): - """When no headers_redact, header values are in logs and requests.""" - url = 'http://hostname/path' - headers = {'sensitive': 'sekret', 'server': 'blah'} - httpretty.register_uri(httpretty.GET, url) - - read_file_or_url(url, headers=headers) - for k in headers.keys(): - self.assertEqual(headers[k], httpretty.last_request().headers[k]) - logs = self.logs.getvalue() - self.assertNotIn(REDACTED, logs) - self.assertIn('sekret', logs) - - @mock.patch(M_PATH + 'readurl') - def test_read_file_or_url_passes_params_to_readurl(self, m_readurl): - """read_file_or_url passes all params through to readurl.""" - url = 'http://hostname/path' - response = 'This is my url content\n' - m_readurl.return_value = response - params = {'url': url, 'timeout': 1, 'retries': 2, - 'headers': {'somehdr': 'val'}, - 'data': 'data', 'sec_between': 1, - 'ssl_details': {'cert_file': '/path/cert.pem'}, - 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'} - self.assertEqual(response, read_file_or_url(**params)) - params.pop('url') # url is passed in as a positional arg - self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list) - - def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self): - """Readurl param defaults used when unspecified by read_file_or_url - - Param defaults tested are as follows: - retries: 0, additional headers None beyond default, method: GET, - data: None, check_status: True and allow_redirects: True - """ - url = 'http://hostname/path' - - m_response = mock.MagicMock() - - class FakeSession(requests.Session): - @classmethod - def request(cls, **kwargs): - self.assertEqual( - {'url': url, 'allow_redirects': True, 'method': 'GET', - 'headers': { - 'User-Agent': 'Cloud-Init/%s' % ( - version.version_string())}}, - kwargs) - return m_response - - with mock.patch(M_PATH + 'requests.Session') as m_session: - error = requests.exceptions.HTTPError('broke') - m_session.side_effect = [error, FakeSession()] - # assert no retries and check_status == True - with self.assertRaises(UrlError) as context_manager: - response = read_file_or_url(url) - self.assertEqual('broke', str(context_manager.exception)) - # assert default headers, method, url and allow_redirects True - # Success on 2nd call with FakeSession - response = read_file_or_url(url) - self.assertEqual(m_response, response._response) - - -class TestRetryOnUrlExc(CiTestCase): - - def test_do_not_retry_non_urlerror(self): - """When exception is not UrlError return False.""" - myerror = IOError('something unexcpected') - self.assertFalse(retry_on_url_exc(msg='', exc=myerror)) - - def test_perform_retries_on_not_found(self): - """When exception is UrlError with a 404 status code return True.""" - myerror = UrlError(cause=RuntimeError( - 'something was not found'), code=NOT_FOUND) - self.assertTrue(retry_on_url_exc(msg='', exc=myerror)) - - def test_perform_retries_on_timeout(self): - """When exception is a requests.Timout return True.""" - myerror = UrlError(cause=requests.Timeout('something timed out')) - self.assertTrue(retry_on_url_exc(msg='', exc=myerror)) - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py deleted file mode 100644 index b7a302f1..00000000 --- a/cloudinit/tests/test_util.py +++ /dev/null @@ -1,854 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests for cloudinit.util""" - -import base64 -import logging -import json -import platform -import pytest - -import cloudinit.util as util -from cloudinit import subp - -from cloudinit.tests.helpers import CiTestCase, mock -from textwrap import dedent - -LOG = logging.getLogger(__name__) - -MOUNT_INFO = [ - '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64', - '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2' -] - -OS_RELEASE_SLES = dedent("""\ - NAME="SLES" - VERSION="12-SP3" - VERSION_ID="12.3" - PRETTY_NAME="SUSE Linux Enterprise Server 12 SP3" - ID="sles" - ANSI_COLOR="0;32" - CPE_NAME="cpe:/o:suse:sles:12:sp3" -""") - -OS_RELEASE_OPENSUSE = dedent("""\ - NAME="openSUSE Leap" - VERSION="42.3" - ID=opensuse - ID_LIKE="suse" - VERSION_ID="42.3" - PRETTY_NAME="openSUSE Leap 42.3" - ANSI_COLOR="0;32" - CPE_NAME="cpe:/o:opensuse:leap:42.3" - BUG_REPORT_URL="https://bugs.opensuse.org" - HOME_URL="https://www.opensuse.org/" -""") - -OS_RELEASE_OPENSUSE_L15 = dedent("""\ - NAME="openSUSE Leap" - VERSION="15.0" - ID="opensuse-leap" - ID_LIKE="suse opensuse" - VERSION_ID="15.0" - PRETTY_NAME="openSUSE Leap 15.0" - ANSI_COLOR="0;32" - CPE_NAME="cpe:/o:opensuse:leap:15.0" - BUG_REPORT_URL="https://bugs.opensuse.org" - HOME_URL="https://www.opensuse.org/" -""") - -OS_RELEASE_OPENSUSE_TW = dedent("""\ - NAME="openSUSE Tumbleweed" - ID="opensuse-tumbleweed" - ID_LIKE="opensuse suse" - VERSION_ID="20180920" - PRETTY_NAME="openSUSE Tumbleweed" - ANSI_COLOR="0;32" - CPE_NAME="cpe:/o:opensuse:tumbleweed:20180920" - BUG_REPORT_URL="https://bugs.opensuse.org" - HOME_URL="https://www.opensuse.org/" -""") - -OS_RELEASE_CENTOS = dedent("""\ - NAME="CentOS Linux" - VERSION="7 (Core)" - ID="centos" - ID_LIKE="rhel fedora" - VERSION_ID="7" - PRETTY_NAME="CentOS Linux 7 (Core)" - ANSI_COLOR="0;31" - CPE_NAME="cpe:/o:centos:centos:7" - HOME_URL="https://www.centos.org/" - BUG_REPORT_URL="https://bugs.centos.org/" - - CENTOS_MANTISBT_PROJECT="CentOS-7" - CENTOS_MANTISBT_PROJECT_VERSION="7" - REDHAT_SUPPORT_PRODUCT="centos" - REDHAT_SUPPORT_PRODUCT_VERSION="7" -""") - -OS_RELEASE_REDHAT_7 = dedent("""\ - NAME="Red Hat Enterprise Linux Server" - VERSION="7.5 (Maipo)" - ID="rhel" - ID_LIKE="fedora" - VARIANT="Server" - VARIANT_ID="server" - VERSION_ID="7.5" - PRETTY_NAME="Red Hat" - ANSI_COLOR="0;31" - CPE_NAME="cpe:/o:redhat:enterprise_linux:7.5:GA:server" - HOME_URL="https://www.redhat.com/" - BUG_REPORT_URL="https://bugzilla.redhat.com/" - - REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 7" - REDHAT_BUGZILLA_PRODUCT_VERSION=7.5 - REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux" - REDHAT_SUPPORT_PRODUCT_VERSION="7.5" -""") - -REDHAT_RELEASE_CENTOS_6 = "CentOS release 6.10 (Final)" -REDHAT_RELEASE_CENTOS_7 = "CentOS Linux release 7.5.1804 (Core)" -REDHAT_RELEASE_REDHAT_6 = ( - "Red Hat Enterprise Linux Server release 6.10 (Santiago)") -REDHAT_RELEASE_REDHAT_7 = ( - "Red Hat Enterprise Linux Server release 7.5 (Maipo)") - - -OS_RELEASE_DEBIAN = dedent("""\ - PRETTY_NAME="Debian GNU/Linux 9 (stretch)" - NAME="Debian GNU/Linux" - VERSION_ID="9" - VERSION="9 (stretch)" - ID=debian - HOME_URL="https://www.debian.org/" - SUPPORT_URL="https://www.debian.org/support" - BUG_REPORT_URL="https://bugs.debian.org/" -""") - -OS_RELEASE_UBUNTU = dedent("""\ - NAME="Ubuntu"\n - # comment test - VERSION="16.04.3 LTS (Xenial Xerus)"\n - ID=ubuntu\n - ID_LIKE=debian\n - PRETTY_NAME="Ubuntu 16.04.3 LTS"\n - VERSION_ID="16.04"\n - HOME_URL="http://www.ubuntu.com/"\n - SUPPORT_URL="http://help.ubuntu.com/"\n - BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"\n - VERSION_CODENAME=xenial\n - UBUNTU_CODENAME=xenial\n -""") - - -class FakeCloud(object): - - def __init__(self, hostname, fqdn): - self.hostname = hostname - self.fqdn = fqdn - self.calls = [] - - def get_hostname(self, fqdn=None, metadata_only=None): - myargs = {} - if fqdn is not None: - myargs['fqdn'] = fqdn - if metadata_only is not None: - myargs['metadata_only'] = metadata_only - self.calls.append(myargs) - if fqdn: - return self.fqdn - return self.hostname - - -class TestUtil(CiTestCase): - - def test_parse_mount_info_no_opts_no_arg(self): - result = util.parse_mount_info('/home', MOUNT_INFO, LOG) - self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) - - def test_parse_mount_info_no_opts_arg(self): - result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False) - self.assertEqual(('/dev/sda2', 'xfs', '/home'), result) - - def test_parse_mount_info_with_opts(self): - result = util.parse_mount_info('/', MOUNT_INFO, LOG, True) - self.assertEqual( - ('/dev/sda1', 'btrfs', '/', 'ro,relatime'), - result - ) - - @mock.patch('cloudinit.util.get_mount_info') - def test_mount_is_rw(self, m_mount_info): - m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime') - is_rw = util.mount_is_read_write('/') - self.assertEqual(is_rw, True) - - @mock.patch('cloudinit.util.get_mount_info') - def test_mount_is_ro(self, m_mount_info): - m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime') - is_rw = util.mount_is_read_write('/') - self.assertEqual(is_rw, False) - - -class TestUptime(CiTestCase): - - @mock.patch('cloudinit.util.boottime') - @mock.patch('cloudinit.util.os.path.exists') - @mock.patch('cloudinit.util.time.time') - def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime): - boottime = 1000.0 - uptime = 10.0 - m_boottime.return_value = boottime - m_time.return_value = boottime + uptime - m_exists.return_value = False - result = util.uptime() - self.assertEqual(str(uptime), result) - - -class TestShellify(CiTestCase): - - def test_input_dict_raises_type_error(self): - self.assertRaisesRegex( - TypeError, 'Input.*was.*dict.*xpected', - util.shellify, {'mykey': 'myval'}) - - def test_input_str_raises_type_error(self): - self.assertRaisesRegex( - TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar") - - def test_value_with_int_raises_type_error(self): - self.assertRaisesRegex( - TypeError, 'shellify.*int', util.shellify, ["foo", 1]) - - def test_supports_strings_and_lists(self): - self.assertEqual( - '\n'.join(["#!/bin/sh", "echo hi mom", "'echo' 'hi dad'", - "'echo' 'hi' 'sis'", ""]), - util.shellify(["echo hi mom", ["echo", "hi dad"], - ('echo', 'hi', 'sis')])) - - -class TestGetHostnameFqdn(CiTestCase): - - def test_get_hostname_fqdn_from_only_cfg_fqdn(self): - """When cfg only has the fqdn key, derive hostname and fqdn from it.""" - hostname, fqdn = util.get_hostname_fqdn( - cfg={'fqdn': 'myhost.domain.com'}, cloud=None) - self.assertEqual('myhost', hostname) - self.assertEqual('myhost.domain.com', fqdn) - - def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self): - """When cfg has both fqdn and hostname keys, return them.""" - hostname, fqdn = util.get_hostname_fqdn( - cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None) - self.assertEqual('other', hostname) - self.assertEqual('myhost.domain.com', fqdn) - - def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self): - """When cfg has only hostname key which represents a fqdn, use that.""" - hostname, fqdn = util.get_hostname_fqdn( - cfg={'hostname': 'myhost.domain.com'}, cloud=None) - self.assertEqual('myhost', hostname) - self.assertEqual('myhost.domain.com', fqdn) - - def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self): - """When cfg has a hostname without a '.' query cloud.get_hostname.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') - hostname, fqdn = util.get_hostname_fqdn( - cfg={'hostname': 'myhost'}, cloud=mycloud) - self.assertEqual('myhost', hostname) - self.assertEqual('cloudhost.mycloud.com', fqdn) - self.assertEqual( - [{'fqdn': True, 'metadata_only': False}], mycloud.calls) - - def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self): - """When cfg has neither hostname nor fqdn cloud.get_hostname.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') - hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud) - self.assertEqual('cloudhost', hostname) - self.assertEqual('cloudhost.mycloud.com', fqdn) - self.assertEqual( - [{'fqdn': True, 'metadata_only': False}, - {'metadata_only': False}], mycloud.calls) - - def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self): - """Calls to cloud.get_hostname pass the metadata_only parameter.""" - mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com') - _hn, _fqdn = util.get_hostname_fqdn( - cfg={}, cloud=mycloud, metadata_only=True) - self.assertEqual( - [{'fqdn': True, 'metadata_only': True}, - {'metadata_only': True}], mycloud.calls) - - -class TestBlkid(CiTestCase): - ids = { - "id01": "1111-1111", - "id02": "22222222-2222", - "id03": "33333333-3333", - "id04": "44444444-4444", - "id05": "55555555-5555-5555-5555-555555555555", - "id06": "66666666-6666-6666-6666-666666666666", - "id07": "52894610484658920398", - "id08": "86753098675309867530", - "id09": "99999999-9999-9999-9999-999999999999", - } - - blkid_out = dedent("""\ - /dev/loop0: TYPE="squashfs" - /dev/loop1: TYPE="squashfs" - /dev/loop2: TYPE="squashfs" - /dev/loop3: TYPE="squashfs" - /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}" - /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}" - /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}" - /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """ - """TYPE="zfs_member" PARTUUID="{id09}" - /dev/loop4: TYPE="squashfs" - """) - - maxDiff = None - - def _get_expected(self): - return ({ - "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"}, - "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"}, - "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"}, - "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"}, - "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"}, - "/dev/sda1": {"DEVNAME": "/dev/sda1", "TYPE": "vfat", - "UUID": self.ids["id01"], - "PARTUUID": self.ids["id02"]}, - "/dev/sda2": {"DEVNAME": "/dev/sda2", "TYPE": "ext4", - "UUID": self.ids["id03"], - "PARTUUID": self.ids["id04"]}, - "/dev/sda3": {"DEVNAME": "/dev/sda3", "TYPE": "ext4", - "UUID": self.ids["id05"], - "PARTUUID": self.ids["id06"]}, - "/dev/sda4": {"DEVNAME": "/dev/sda4", "TYPE": "zfs_member", - "LABEL": "default", - "UUID": self.ids["id07"], - "UUID_SUB": self.ids["id08"], - "PARTUUID": self.ids["id09"]}, - }) - - @mock.patch("cloudinit.subp.subp") - def test_functional_blkid(self, m_subp): - m_subp.return_value = ( - self.blkid_out.format(**self.ids), "") - self.assertEqual(self._get_expected(), util.blkid()) - m_subp.assert_called_with(["blkid", "-o", "full"], capture=True, - decode="replace") - - @mock.patch("cloudinit.subp.subp") - def test_blkid_no_cache_uses_no_cache(self, m_subp): - """blkid should turn off cache if disable_cache is true.""" - m_subp.return_value = ( - self.blkid_out.format(**self.ids), "") - self.assertEqual(self._get_expected(), - util.blkid(disable_cache=True)) - m_subp.assert_called_with(["blkid", "-o", "full", "-c", "/dev/null"], - capture=True, decode="replace") - - -@mock.patch('cloudinit.subp.subp') -class TestUdevadmSettle(CiTestCase): - def test_with_no_params(self, m_subp): - """called with no parameters.""" - util.udevadm_settle() - m_subp.called_once_with(mock.call(['udevadm', 'settle'])) - - def test_with_exists_and_not_exists(self, m_subp): - """with exists=file where file does not exist should invoke subp.""" - mydev = self.tmp_path("mydev") - util.udevadm_settle(exists=mydev) - m_subp.called_once_with( - ['udevadm', 'settle', '--exit-if-exists=%s' % mydev]) - - def test_with_exists_and_file_exists(self, m_subp): - """with exists=file where file does exist should not invoke subp.""" - mydev = self.tmp_path("mydev") - util.write_file(mydev, "foo\n") - util.udevadm_settle(exists=mydev) - self.assertIsNone(m_subp.call_args) - - def test_with_timeout_int(self, m_subp): - """timeout can be an integer.""" - timeout = 9 - util.udevadm_settle(timeout=timeout) - m_subp.called_once_with( - ['udevadm', 'settle', '--timeout=%s' % timeout]) - - def test_with_timeout_string(self, m_subp): - """timeout can be a string.""" - timeout = "555" - util.udevadm_settle(timeout=timeout) - m_subp.assert_called_once_with( - ['udevadm', 'settle', '--timeout=%s' % timeout]) - - def test_with_exists_and_timeout(self, m_subp): - """test call with both exists and timeout.""" - mydev = self.tmp_path("mydev") - timeout = "3" - util.udevadm_settle(exists=mydev) - m_subp.called_once_with( - ['udevadm', 'settle', '--exit-if-exists=%s' % mydev, - '--timeout=%s' % timeout]) - - def test_subp_exception_raises_to_caller(self, m_subp): - m_subp.side_effect = subp.ProcessExecutionError("BOOM") - self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle) - - -@mock.patch('os.path.exists') -class TestGetLinuxDistro(CiTestCase): - - def setUp(self): - # python2 has no lru_cache, and therefore, no cache_clear() - if hasattr(util.get_linux_distro, "cache_clear"): - util.get_linux_distro.cache_clear() - - @classmethod - def os_release_exists(self, path): - """Side effect function""" - if path == '/etc/os-release': - return 1 - - @classmethod - def redhat_release_exists(self, path): - """Side effect function """ - if path == '/etc/redhat-release': - return 1 - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): - """Verify we get the correct name if the os-release file has - the distro name in quotes""" - m_os_release.return_value = OS_RELEASE_SLES - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('sles', '12.3', platform.machine()), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): - """Verify we get the correct name if the os-release file does not - have the distro name in quotes""" - m_os_release.return_value = OS_RELEASE_UBUNTU - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('ubuntu', '16.04', 'xenial'), dist) - - @mock.patch('platform.system') - @mock.patch('platform.release') - @mock.patch('cloudinit.util._parse_redhat_release') - def test_get_linux_freebsd(self, m_parse_redhat_release, - m_platform_release, - m_platform_system, m_path_exists): - """Verify we get the correct name and release name on FreeBSD.""" - m_path_exists.return_value = False - m_platform_release.return_value = '12.0-RELEASE-p10' - m_platform_system.return_value = 'FreeBSD' - m_parse_redhat_release.return_value = {} - util.is_BSD.cache_clear() - dist = util.get_linux_distro() - self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_centos6(self, m_os_release, m_path_exists): - """Verify we get the correct name and release name on CentOS 6.""" - m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 - m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists - dist = util.get_linux_distro() - self.assertEqual(('centos', '6.10', 'Final'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): - """Verify the correct release info on CentOS 7 without os-release.""" - m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 - m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists - dist = util.get_linux_distro() - self.assertEqual(('centos', '7.5.1804', 'Core'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): - """Verify redhat 7 read from os-release.""" - m_os_release.return_value = OS_RELEASE_REDHAT_7 - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('redhat', '7.5', 'Maipo'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): - """Verify redhat 7 read from redhat-release.""" - m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 - m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists - dist = util.get_linux_distro() - self.assertEqual(('redhat', '7.5', 'Maipo'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): - """Verify redhat 6 read from redhat-release.""" - m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 - m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists - dist = util.get_linux_distro() - self.assertEqual(('redhat', '6.10', 'Santiago'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_copr_centos(self, m_os_release, m_path_exists): - """Verify we get the correct name and release name on COPR CentOS.""" - m_os_release.return_value = OS_RELEASE_CENTOS - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('centos', '7', 'Core'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_debian(self, m_os_release, m_path_exists): - """Verify we get the correct name and release name on Debian.""" - m_os_release.return_value = OS_RELEASE_DEBIAN - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('debian', '9', 'stretch'), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_opensuse(self, m_os_release, m_path_exists): - """Verify we get the correct name and machine arch on openSUSE - prior to openSUSE Leap 15. - """ - m_os_release.return_value = OS_RELEASE_OPENSUSE - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('opensuse', '42.3', platform.machine()), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): - """Verify we get the correct name and machine arch on openSUSE - for openSUSE Leap 15.0 and later. - """ - m_os_release.return_value = OS_RELEASE_OPENSUSE_L15 - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist) - - @mock.patch('cloudinit.util.load_file') - def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): - """Verify we get the correct name and machine arch on openSUSE - for openSUSE Tumbleweed - """ - m_os_release.return_value = OS_RELEASE_OPENSUSE_TW - m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists - dist = util.get_linux_distro() - self.assertEqual( - ('opensuse-tumbleweed', '20180920', platform.machine()), dist) - - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) - def test_get_linux_distro_no_data(self, m_platform_dist, - m_platform_system, m_path_exists): - """Verify we get no information if os-release does not exist""" - m_platform_dist.return_value = ('', '', '') - m_platform_system.return_value = "Linux" - m_path_exists.return_value = 0 - dist = util.get_linux_distro() - self.assertEqual(('', '', ''), dist) - - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) - def test_get_linux_distro_no_impl(self, m_platform_dist, - m_platform_system, m_path_exists): - """Verify we get an empty tuple when no information exists and - Exceptions are not propagated""" - m_platform_dist.side_effect = Exception() - m_platform_system.return_value = "Linux" - m_path_exists.return_value = 0 - dist = util.get_linux_distro() - self.assertEqual(('', '', ''), dist) - - @mock.patch('platform.system') - @mock.patch('platform.dist', create=True) - def test_get_linux_distro_plat_data(self, m_platform_dist, - m_platform_system, m_path_exists): - """Verify we get the correct platform information""" - m_platform_dist.return_value = ('foo', '1.1', 'aarch64') - m_platform_system.return_value = "Linux" - m_path_exists.return_value = 0 - dist = util.get_linux_distro() - self.assertEqual(('foo', '1.1', 'aarch64'), dist) - - -class TestJsonDumps(CiTestCase): - def test_is_str(self): - """json_dumps should return a string.""" - self.assertTrue(isinstance(util.json_dumps({'abc': '123'}), str)) - - def test_utf8(self): - smiley = '\\ud83d\\ude03' - self.assertEqual( - {'smiley': smiley}, - json.loads(util.json_dumps({'smiley': smiley}))) - - def test_non_utf8(self): - blob = b'\xba\x03Qx-#y\xea' - self.assertEqual( - {'blob': 'ci-b64:' + base64.b64encode(blob).decode('utf-8')}, - json.loads(util.json_dumps({'blob': blob}))) - - -@mock.patch('os.path.exists') -class TestIsLXD(CiTestCase): - - def test_is_lxd_true_on_sock_device(self, m_exists): - """When lxd's /dev/lxd/sock exists, is_lxd returns true.""" - m_exists.return_value = True - self.assertTrue(util.is_lxd()) - m_exists.assert_called_once_with('/dev/lxd/sock') - - def test_is_lxd_false_when_sock_device_absent(self, m_exists): - """When lxd's /dev/lxd/sock is absent, is_lxd returns false.""" - m_exists.return_value = False - self.assertFalse(util.is_lxd()) - m_exists.assert_called_once_with('/dev/lxd/sock') - - -class TestReadCcFromCmdline: - - @pytest.mark.parametrize( - "cmdline,expected_cfg", - [ - # Return None if cmdline has no cc:<YAML>end_cc content. - (CiTestCase.random_string(), None), - # Return None if YAML content is empty string. - ('foo cc: end_cc bar', None), - # Return expected dictionary without trailing end_cc marker. - ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}), - # Return expected dictionary w escaped newline and no end_cc. - ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}), - # Return expected dictionary of yaml between cc: and end_cc. - ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}), - # Return dict with list value w escaped newline, no end_cc. - ( - 'cc: ssh_import_id: [smoser, kirkland]\\n', - {'ssh_import_id': ['smoser', 'kirkland']} - ), - # Parse urlencoded brackets in yaml content. - ( - 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc', - {'ssh_import_id': ['smoser', 'kirkland']} - ), - # Parse complete urlencoded yaml content. - ( - 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc', - {'ssh_import_id': ['user1', 'user2']} - ), - # Parse nested dictionary in yaml content. - ( - 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc', - {'ntp': {'enabled': True, 'ntp_client': 'myclient'}} - ), - # Parse single mapping value in yaml content. - ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}), - # Parse multiline content with multiple mapping and nested lists. - ( - ('cc: ssh_import_id: [smoser, bob]\\n' - 'runcmd: [ [ ls, -l ], echo hi ] end_cc'), - {'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi']} - ), - # Parse multiline encoded content w/ mappings and nested lists. - ( - ('cc: ssh_import_id: %5Bsmoser, bob%5D\\n' - 'runcmd: [ [ ls, -l ], echo hi ] end_cc'), - {'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi']} - ), - # test encoded escaped newlines work. - # - # unquote(encoded_content) - # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]' - ( - ('cc: ' + - ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn' - 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C' - '%20echo%20hi%20%5D') + ' end_cc'), - {'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi']} - ), - # test encoded newlines work. - # - # unquote(encoded_content) - # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]' - ( - ("cc: " + - ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A' - 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C' - '%20echo%20hi%20%5D') + ' end_cc'), - {'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l'], 'echo hi']} - ), - # Parse and merge multiple yaml content sections. - ( - ('cc:ssh_import_id: [smoser, bob] end_cc ' - 'cc: runcmd: [ [ ls, -l ] ] end_cc'), - {'ssh_import_id': ['smoser', 'bob'], - 'runcmd': [['ls', '-l']]} - ), - # Parse and merge multiple encoded yaml content sections. - ( - ('cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc ' - 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'), - {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]} - ), - ] - ) - def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline): - assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline) - - -class TestMountCb: - """Tests for ``util.mount_cb``. - - These tests consider the "unit" under test to be ``util.mount_cb`` and - ``util.unmounter``, which is only used by ``mount_cb``. - - TODO: Test default mtype determination - TODO: Test the if/else branch that actually performs the mounting operation - """ - - @pytest.yield_fixture - def already_mounted_device_and_mountdict(self): - """Mock an already-mounted device, and yield (device, mount dict)""" - device = "/dev/fake0" - mountpoint = "/mnt/fake" - with mock.patch("cloudinit.util.subp.subp"): - with mock.patch("cloudinit.util.mounts") as m_mounts: - mounts = {device: {"mountpoint": mountpoint}} - m_mounts.return_value = mounts - yield device, mounts[device] - - @pytest.fixture - def already_mounted_device(self, already_mounted_device_and_mountdict): - """already_mounted_device_and_mountdict, but return only the device""" - return already_mounted_device_and_mountdict[0] - - @pytest.mark.parametrize( - "mtype,expected", - [ - # While the filesystem is called iso9660, the mount type is cd9660 - ("iso9660", "cd9660"), - # vfat is generally called "msdos" on BSD - ("vfat", "msdos"), - # judging from man pages, only FreeBSD has this alias - ("msdosfs", "msdos"), - # Test happy path - ("ufs", "ufs") - ], - ) - @mock.patch("cloudinit.util.is_Linux", autospec=True) - @mock.patch("cloudinit.util.is_BSD", autospec=True) - @mock.patch("cloudinit.util.subp.subp") - @mock.patch("cloudinit.temp_utils.tempdir", autospec=True) - def test_normalize_mtype_on_bsd( - self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected - ): - m_is_BSD.return_value = True - m_is_Linux.return_value = False - m_tmpdir.return_value.__enter__ = mock.Mock( - autospec=True, return_value="/tmp/fake" - ) - m_tmpdir.return_value.__exit__ = mock.Mock( - autospec=True, return_value=True - ) - callback = mock.Mock(autospec=True) - - util.mount_cb('/dev/fake0', callback, mtype=mtype) - assert mock.call( - ["mount", "-o", "ro", "-t", expected, "/dev/fake0", "/tmp/fake"], - update_env=None) in m_subp.call_args_list - - @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()]) - def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype): - with pytest.raises(TypeError): - util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype) - - @mock.patch("cloudinit.util.subp.subp") - def test_already_mounted_does_not_mount_or_umount_anything( - self, m_subp, already_mounted_device - ): - util.mount_cb(already_mounted_device, mock.Mock()) - - assert 0 == m_subp.call_count - - @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""]) - def test_already_mounted_calls_callback( - self, trailing_slash_in_mounts, already_mounted_device_and_mountdict - ): - device, mount_dict = already_mounted_device_and_mountdict - mountpoint = mount_dict["mountpoint"] - mount_dict["mountpoint"] += trailing_slash_in_mounts - - callback = mock.Mock() - util.mount_cb(device, callback) - - # The mountpoint passed to callback should always have a trailing - # slash, regardless of the input - assert [mock.call(mountpoint + "/")] == callback.call_args_list - - def test_already_mounted_calls_callback_with_data( - self, already_mounted_device - ): - callback = mock.Mock() - util.mount_cb( - already_mounted_device, callback, data=mock.sentinel.data - ) - - assert [ - mock.call(mock.ANY, mock.sentinel.data) - ] == callback.call_args_list - - -@mock.patch("cloudinit.util.write_file") -class TestEnsureFile: - """Tests for ``cloudinit.util.ensure_file``.""" - - def test_parameters_passed_through(self, m_write_file): - """Test the parameters in the signature are passed to write_file.""" - util.ensure_file( - mock.sentinel.path, - mode=mock.sentinel.mode, - preserve_mode=mock.sentinel.preserve_mode, - ) - - assert 1 == m_write_file.call_count - args, kwargs = m_write_file.call_args - assert (mock.sentinel.path,) == args - assert mock.sentinel.mode == kwargs["mode"] - assert mock.sentinel.preserve_mode == kwargs["preserve_mode"] - - @pytest.mark.parametrize( - "kwarg,expected", - [ - # Files should be world-readable by default - ("mode", 0o644), - # The previous behaviour of not preserving mode should be retained - ("preserve_mode", False), - ], - ) - def test_defaults(self, m_write_file, kwarg, expected): - """Test that ensure_file defaults appropriately.""" - util.ensure_file(mock.sentinel.path) - - assert 1 == m_write_file.call_count - _args, kwargs = m_write_file.call_args - assert expected == kwargs[kwarg] - - def test_static_parameters_are_passed(self, m_write_file): - """Test that the static write_files parameters are passed correctly.""" - util.ensure_file(mock.sentinel.path) - - assert 1 == m_write_file.call_count - _args, kwargs = m_write_file.call_args - assert "" == kwargs["content"] - assert "ab" == kwargs["omode"] - - -# vi: ts=4 expandtab diff --git a/cloudinit/tests/test_version.py b/cloudinit/tests/test_version.py deleted file mode 100644 index 778a762c..00000000 --- a/cloudinit/tests/test_version.py +++ /dev/null @@ -1,31 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from unittest import mock - -from cloudinit.tests.helpers import CiTestCase -from cloudinit import version - - -class TestExportsFeatures(CiTestCase): - def test_has_network_config_v1(self): - self.assertIn('NETWORK_CONFIG_V1', version.FEATURES) - - def test_has_network_config_v2(self): - self.assertIn('NETWORK_CONFIG_V2', version.FEATURES) - - -class TestVersionString(CiTestCase): - @mock.patch("cloudinit.version._PACKAGED_VERSION", - "17.2-3-gb05b9972-0ubuntu1") - def test_package_version_respected(self): - """If _PACKAGED_VERSION is filled in, then it should be returned.""" - self.assertEqual("17.2-3-gb05b9972-0ubuntu1", version.version_string()) - - @mock.patch("cloudinit.version._PACKAGED_VERSION", "@@PACKAGED_VERSION@@") - @mock.patch("cloudinit.version.__VERSION__", "17.2") - def test_package_version_skipped(self): - """If _PACKAGED_VERSION is not modified, then return __VERSION__.""" - self.assertEqual("17.2", version.version_string()) - - -# vi: ts=4 expandtab |