summaryrefslogtreecommitdiff
path: root/cloudinit/tests
diff options
context:
space:
mode:
authorzsdc <taras@vyos.io>2022-03-25 20:58:01 +0200
committerzsdc <taras@vyos.io>2022-03-25 21:42:00 +0200
commit31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (patch)
tree349631a02467dae0158f6f663cc8aa8537974a97 /cloudinit/tests
parent5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff)
parent8537237d80a48c8f0cbf8e66aa4826bbc882b022 (diff)
downloadvyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.tar.gz
vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.zip
T2117: Cloud-init updated to 22.1
Merged with 22.1 tag from the upstream Cloud-init repository. Our modules were slightly modified for compatibility with the new version.
Diffstat (limited to 'cloudinit/tests')
-rw-r--r--cloudinit/tests/__init__.py0
-rw-r--r--cloudinit/tests/helpers.py504
-rw-r--r--cloudinit/tests/test_conftest.py65
-rw-r--r--cloudinit/tests/test_dhclient_hook.py105
-rw-r--r--cloudinit/tests/test_dmi.py154
-rw-r--r--cloudinit/tests/test_features.py60
-rw-r--r--cloudinit/tests/test_gpg.py55
-rw-r--r--cloudinit/tests/test_netinfo.py181
-rw-r--r--cloudinit/tests/test_persistence.py127
-rw-r--r--cloudinit/tests/test_simpletable.py106
-rw-r--r--cloudinit/tests/test_stages.py406
-rw-r--r--cloudinit/tests/test_subp.py286
-rw-r--r--cloudinit/tests/test_temp_utils.py117
-rw-r--r--cloudinit/tests/test_upgrade.py45
-rw-r--r--cloudinit/tests/test_url_helper.py174
-rw-r--r--cloudinit/tests/test_util.py854
-rw-r--r--cloudinit/tests/test_version.py31
17 files changed, 0 insertions, 3270 deletions
diff --git a/cloudinit/tests/__init__.py b/cloudinit/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cloudinit/tests/__init__.py
+++ /dev/null
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
deleted file mode 100644
index 58f63b69..00000000
--- a/cloudinit/tests/helpers.py
+++ /dev/null
@@ -1,504 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import functools
-import httpretty
-import io
-import logging
-import os
-import random
-import shutil
-import string
-import sys
-import tempfile
-import time
-import unittest
-from contextlib import ExitStack, contextmanager
-from unittest import mock
-from unittest.util import strclass
-
-from cloudinit.config.schema import (
- SchemaValidationError, validate_cloudconfig_schema)
-from cloudinit import cloud
-from cloudinit import distros
-from cloudinit import helpers as ch
-from cloudinit.sources import DataSourceNone
-from cloudinit.templater import JINJA_AVAILABLE
-from cloudinit import subp
-from cloudinit import util
-
-_real_subp = subp.subp
-
-# Used for skipping tests
-SkipTest = unittest.SkipTest
-skipIf = unittest.skipIf
-
-
-# Makes the old path start
-# with new base instead of whatever
-# it previously had
-def rebase_path(old_path, new_base):
- if old_path.startswith(new_base):
- # Already handled...
- return old_path
- # Retarget the base of that path
- # to the new base instead of the
- # old one...
- path = os.path.join(new_base, old_path.lstrip("/"))
- path = os.path.abspath(path)
- return path
-
-
-# Can work on anything that takes a path as arguments
-def retarget_many_wrapper(new_base, am, old_func):
- def wrapper(*args, **kwds):
- n_args = list(args)
- nam = am
- if am == -1:
- nam = len(n_args)
- for i in range(0, nam):
- path = args[i]
- # patchOS() wraps various os and os.path functions, however in
- # Python 3 some of these now accept file-descriptors (integers).
- # That breaks rebase_path() so in lieu of a better solution, just
- # don't rebase if we get a fd.
- if isinstance(path, str):
- n_args[i] = rebase_path(path, new_base)
- return old_func(*n_args, **kwds)
- return wrapper
-
-
-class TestCase(unittest.TestCase):
-
- def reset_global_state(self):
- """Reset any global state to its original settings.
-
- cloudinit caches some values in cloudinit.util. Unit tests that
- involved those cached paths were then subject to failure if the order
- of invocation changed (LP: #1703697).
-
- This function resets any of these global state variables to their
- initial state.
-
- In the future this should really be done with some registry that
- can then be cleaned in a more obvious way.
- """
- util.PROC_CMDLINE = None
- util._DNS_REDIRECT_IP = None
- util._LSB_RELEASE = {}
-
- def setUp(self):
- super(TestCase, self).setUp()
- self.reset_global_state()
-
- def shortDescription(self):
- return strclass(self.__class__) + '.' + self._testMethodName
-
- def add_patch(self, target, attr, *args, **kwargs):
- """Patches specified target object and sets it as attr on test
- instance also schedules cleanup"""
- if 'autospec' not in kwargs:
- kwargs['autospec'] = True
- m = mock.patch(target, *args, **kwargs)
- p = m.start()
- self.addCleanup(m.stop)
- setattr(self, attr, p)
-
-
-class CiTestCase(TestCase):
- """This is the preferred test case base class unless user
- needs other test case classes below."""
-
- # Subclass overrides for specific test behavior
- # Whether or not a unit test needs logfile setup
- with_logs = False
- allowed_subp = False
- SUBP_SHELL_TRUE = "shell=true"
-
- @contextmanager
- def allow_subp(self, allowed_subp):
- orig = self.allowed_subp
- try:
- self.allowed_subp = allowed_subp
- yield
- finally:
- self.allowed_subp = orig
-
- def setUp(self):
- super(CiTestCase, self).setUp()
- if self.with_logs:
- # Create a log handler so unit tests can search expected logs.
- self.logger = logging.getLogger()
- self.logs = io.StringIO()
- formatter = logging.Formatter('%(levelname)s: %(message)s')
- handler = logging.StreamHandler(self.logs)
- handler.setFormatter(formatter)
- self.old_handlers = self.logger.handlers
- self.logger.handlers = [handler]
- if self.allowed_subp is True:
- subp.subp = _real_subp
- else:
- subp.subp = self._fake_subp
-
- def _fake_subp(self, *args, **kwargs):
- if 'args' in kwargs:
- cmd = kwargs['args']
- else:
- if not args:
- raise TypeError(
- "subp() missing 1 required positional argument: 'args'")
- cmd = args[0]
-
- if not isinstance(cmd, str):
- cmd = cmd[0]
- pass_through = False
- if not isinstance(self.allowed_subp, (list, bool)):
- raise TypeError("self.allowed_subp supports list or bool.")
- if isinstance(self.allowed_subp, bool):
- pass_through = self.allowed_subp
- else:
- pass_through = (
- (cmd in self.allowed_subp) or
- (self.SUBP_SHELL_TRUE in self.allowed_subp and
- kwargs.get('shell')))
- if pass_through:
- return _real_subp(*args, **kwargs)
- raise Exception(
- "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
- ', '.join([str(repr(a)) for a in args] +
- ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
-
- def tearDown(self):
- if self.with_logs:
- # Remove the handler we setup
- logging.getLogger().handlers = self.old_handlers
- logging.getLogger().level = None
- subp.subp = _real_subp
- super(CiTestCase, self).tearDown()
-
- def tmp_dir(self, dir=None, cleanup=True):
- # return a full path to a temporary directory that will be cleaned up.
- if dir is None:
- tmpd = tempfile.mkdtemp(
- prefix="ci-%s." % self.__class__.__name__)
- else:
- tmpd = tempfile.mkdtemp(dir=dir)
- self.addCleanup(
- functools.partial(shutil.rmtree, tmpd, ignore_errors=True))
- return tmpd
-
- def tmp_path(self, path, dir=None):
- # return an absolute path to 'path' under dir.
- # if dir is None, one will be created with tmp_dir()
- # the file is not created or modified.
- if dir is None:
- dir = self.tmp_dir()
- return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
-
- def tmp_cloud(self, distro, sys_cfg=None, metadata=None):
- """Create a cloud with tmp working directory paths.
-
- @param distro: Name of the distro to attach to the cloud.
- @param metadata: Optional metadata to set on the datasource.
-
- @return: The built cloud instance.
- """
- self.new_root = self.tmp_dir()
- if not sys_cfg:
- sys_cfg = {}
- tmp_paths = {}
- for var in ['templates_dir', 'run_dir', 'cloud_dir']:
- tmp_paths[var] = self.tmp_path(var, dir=self.new_root)
- util.ensure_dir(tmp_paths[var])
- self.paths = ch.Paths(tmp_paths)
- cls = distros.fetch(distro)
- mydist = cls(distro, sys_cfg, self.paths)
- myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, self.paths)
- if metadata:
- myds.metadata.update(metadata)
- return cloud.Cloud(myds, self.paths, sys_cfg, mydist, None)
-
- @classmethod
- def random_string(cls, length=8):
- """ return a random lowercase string with default length of 8"""
- return ''.join(
- random.choice(string.ascii_lowercase) for _ in range(length))
-
-
-class ResourceUsingTestCase(CiTestCase):
-
- def setUp(self):
- super(ResourceUsingTestCase, self).setUp()
- self.resource_path = None
-
- def getCloudPaths(self, ds=None):
- tmpdir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, tmpdir)
- cp = ch.Paths({'cloud_dir': tmpdir,
- 'templates_dir': resourceLocation()},
- ds=ds)
- return cp
-
-
-class FilesystemMockingTestCase(ResourceUsingTestCase):
-
- def setUp(self):
- super(FilesystemMockingTestCase, self).setUp()
- self.patched_funcs = ExitStack()
-
- def tearDown(self):
- self.patched_funcs.close()
- ResourceUsingTestCase.tearDown(self)
-
- def replicateTestRoot(self, example_root, target_root):
- real_root = resourceLocation()
- real_root = os.path.join(real_root, 'roots', example_root)
- for (dir_path, _dirnames, filenames) in os.walk(real_root):
- real_path = dir_path
- make_path = rebase_path(real_path[len(real_root):], target_root)
- util.ensure_dir(make_path)
- for f in filenames:
- real_path = util.abs_join(real_path, f)
- make_path = util.abs_join(make_path, f)
- shutil.copy(real_path, make_path)
-
- def patchUtils(self, new_root):
- patch_funcs = {
- util: [('write_file', 1),
- ('append_file', 1),
- ('load_file', 1),
- ('ensure_dir', 1),
- ('chmod', 1),
- ('delete_dir_contents', 1),
- ('del_file', 1),
- ('sym_link', -1),
- ('copy', -1)],
- }
- for (mod, funcs) in patch_funcs.items():
- for (f, am) in funcs:
- func = getattr(mod, f)
- trap_func = retarget_many_wrapper(new_root, am, func)
- self.patched_funcs.enter_context(
- mock.patch.object(mod, f, trap_func))
-
- # Handle subprocess calls
- func = getattr(subp, 'subp')
-
- def nsubp(*_args, **_kwargs):
- return ('', '')
-
- self.patched_funcs.enter_context(
- mock.patch.object(subp, 'subp', nsubp))
-
- def null_func(*_args, **_kwargs):
- return None
-
- for f in ['chownbyid', 'chownbyname']:
- self.patched_funcs.enter_context(
- mock.patch.object(util, f, null_func))
-
- def patchOS(self, new_root):
- patch_funcs = {
- os.path: [('isfile', 1), ('exists', 1),
- ('islink', 1), ('isdir', 1), ('lexists', 1)],
- os: [('listdir', 1), ('mkdir', 1),
- ('lstat', 1), ('symlink', 2),
- ('stat', 1)]
- }
-
- if hasattr(os, 'scandir'):
- # py27 does not have scandir
- patch_funcs[os].append(('scandir', 1))
-
- for (mod, funcs) in patch_funcs.items():
- for f, nargs in funcs:
- func = getattr(mod, f)
- trap_func = retarget_many_wrapper(new_root, nargs, func)
- self.patched_funcs.enter_context(
- mock.patch.object(mod, f, trap_func))
-
- def patchOpen(self, new_root):
- trap_func = retarget_many_wrapper(new_root, 1, open)
- self.patched_funcs.enter_context(
- mock.patch('builtins.open', trap_func)
- )
-
- def patchStdoutAndStderr(self, stdout=None, stderr=None):
- if stdout is not None:
- self.patched_funcs.enter_context(
- mock.patch.object(sys, 'stdout', stdout))
- if stderr is not None:
- self.patched_funcs.enter_context(
- mock.patch.object(sys, 'stderr', stderr))
-
- def reRoot(self, root=None):
- if root is None:
- root = self.tmp_dir()
- self.patchUtils(root)
- self.patchOS(root)
- self.patchOpen(root)
- return root
-
- @contextmanager
- def reRooted(self, root=None):
- try:
- yield self.reRoot(root)
- finally:
- self.patched_funcs.close()
-
-
-class HttprettyTestCase(CiTestCase):
- # necessary as http_proxy gets in the way of httpretty
- # https://github.com/gabrielfalcao/HTTPretty/issues/122
- # Also make sure that allow_net_connect is set to False.
- # And make sure reset and enable/disable are done.
-
- def setUp(self):
- self.restore_proxy = os.environ.get('http_proxy')
- if self.restore_proxy is not None:
- del os.environ['http_proxy']
- super(HttprettyTestCase, self).setUp()
- httpretty.HTTPretty.allow_net_connect = False
- httpretty.reset()
- httpretty.enable()
-
- def tearDown(self):
- httpretty.disable()
- httpretty.reset()
- if self.restore_proxy:
- os.environ['http_proxy'] = self.restore_proxy
- super(HttprettyTestCase, self).tearDown()
-
-
-class SchemaTestCaseMixin(unittest.TestCase):
-
- def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."):
- """Assert the config is valid per self.schema.
-
- If there is only one top level key in the schema properties, then
- the cfg will be put under that key."""
- props = list(self.schema.get('properties'))
- # put cfg under top level key if there is only one in the schema
- if len(props) == 1:
- cfg = {props[0]: cfg}
- try:
- validate_cloudconfig_schema(cfg, self.schema, strict=True)
- except SchemaValidationError:
- self.fail(msg)
-
-
-def populate_dir(path, files):
- if not os.path.exists(path):
- os.makedirs(path)
- ret = []
- for (name, content) in files.items():
- p = os.path.sep.join([path, name])
- util.ensure_dir(os.path.dirname(p))
- with open(p, "wb") as fp:
- if isinstance(content, bytes):
- fp.write(content)
- else:
- fp.write(content.encode('utf-8'))
- fp.close()
- ret.append(p)
-
- return ret
-
-
-def populate_dir_with_ts(path, data):
- """data is {'file': ('contents', mtime)}. mtime relative to now."""
- populate_dir(path, dict((k, v[0]) for k, v in data.items()))
- btime = time.time()
- for fpath, (_contents, mtime) in data.items():
- ts = btime + mtime if mtime else btime
- os.utime(os.path.sep.join((path, fpath)), (ts, ts))
-
-
-def dir2dict(startdir, prefix=None):
- flist = {}
- if prefix is None:
- prefix = startdir
- for root, _dirs, files in os.walk(startdir):
- for fname in files:
- fpath = os.path.join(root, fname)
- key = fpath[len(prefix):]
- flist[key] = util.load_file(fpath)
- return flist
-
-
-def wrap_and_call(prefix, mocks, func, *args, **kwargs):
- """
- call func(args, **kwargs) with mocks applied, then unapplies mocks
- nicer to read than repeating dectorators on each function
-
- prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None
- mocks: dictionary of names (under 'prefix') to mock and either
- a return value or a dictionary to pass to the mock.patch call
- func: function to call with mocks applied
- *args,**kwargs: arguments for 'func'
-
- return_value: return from 'func'
- """
- delim = '.'
- if prefix is None:
- prefix = ''
- prefix = prefix.rstrip(delim)
- unwraps = []
- for fname, kw in mocks.items():
- if prefix:
- fname = delim.join((prefix, fname))
- if not isinstance(kw, dict):
- kw = {'return_value': kw}
- p = mock.patch(fname, **kw)
- p.start()
- unwraps.append(p)
- try:
- return func(*args, **kwargs)
- finally:
- for p in unwraps:
- p.stop()
-
-
-def resourceLocation(subname=None):
- path = os.path.join('tests', 'data')
- if not subname:
- return path
- return os.path.join(path, subname)
-
-
-def readResource(name, mode='r'):
- with open(resourceLocation(name), mode) as fh:
- return fh.read()
-
-
-try:
- import jsonschema
- assert jsonschema # avoid pyflakes error F401: import unused
- _missing_jsonschema_dep = False
-except ImportError:
- _missing_jsonschema_dep = True
-
-
-def skipUnlessJsonSchema():
- return skipIf(
- _missing_jsonschema_dep, "No python-jsonschema dependency present.")
-
-
-def skipUnlessJinja():
- return skipIf(not JINJA_AVAILABLE, "No jinja dependency present.")
-
-
-def skipIfJinja():
- return skipIf(JINJA_AVAILABLE, "Jinja dependency present.")
-
-
-# older versions of mock do not have the useful 'assert_not_called'
-if not hasattr(mock.Mock, 'assert_not_called'):
- def __mock_assert_not_called(mmock):
- if mmock.call_count != 0:
- msg = ("[citest] Expected '%s' to not have been called. "
- "Called %s times." %
- (mmock._mock_name or 'mock', mmock.call_count))
- raise AssertionError(msg)
- mock.Mock.assert_not_called = __mock_assert_not_called
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_conftest.py b/cloudinit/tests/test_conftest.py
deleted file mode 100644
index 6f1263a5..00000000
--- a/cloudinit/tests/test_conftest.py
+++ /dev/null
@@ -1,65 +0,0 @@
-import pytest
-
-from cloudinit import subp
-from cloudinit.tests.helpers import CiTestCase
-
-
-class TestDisableSubpUsage:
- """Test that the disable_subp_usage fixture behaves as expected."""
-
- def test_using_subp_raises_assertion_error(self):
- with pytest.raises(AssertionError):
- subp.subp(["some", "args"])
-
- def test_typeerrors_on_incorrect_usage(self):
- with pytest.raises(TypeError):
- # We are intentionally passing no value for a parameter, so:
- # pylint: disable=no-value-for-parameter
- subp.subp()
-
- @pytest.mark.allow_all_subp
- def test_subp_usage_can_be_reenabled(self):
- subp.subp(['whoami'])
-
- @pytest.mark.allow_subp_for("whoami")
- def test_subp_usage_can_be_conditionally_reenabled(self):
- # The two parameters test each potential invocation with a single
- # argument
- with pytest.raises(AssertionError) as excinfo:
- subp.subp(["some", "args"])
- assert "allowed: whoami" in str(excinfo.value)
- subp.subp(['whoami'])
-
- @pytest.mark.allow_subp_for("whoami", "bash")
- def test_subp_usage_can_be_conditionally_reenabled_for_multiple_cmds(self):
- with pytest.raises(AssertionError) as excinfo:
- subp.subp(["some", "args"])
- assert "allowed: whoami,bash" in str(excinfo.value)
- subp.subp(['bash', '-c', 'true'])
- subp.subp(['whoami'])
-
- @pytest.mark.allow_all_subp
- @pytest.mark.allow_subp_for("bash")
- def test_both_marks_raise_an_error(self):
- with pytest.raises(AssertionError, match="marked both"):
- subp.subp(["bash"])
-
-
-class TestDisableSubpUsageInTestSubclass(CiTestCase):
- """Test that disable_subp_usage doesn't impact CiTestCase's subp logic."""
-
- def test_using_subp_raises_exception(self):
- with pytest.raises(Exception):
- subp.subp(["some", "args"])
-
- def test_typeerrors_on_incorrect_usage(self):
- with pytest.raises(TypeError):
- subp.subp()
-
- def test_subp_usage_can_be_reenabled(self):
- _old_allowed_subp = self.allow_subp
- self.allowed_subp = True
- try:
- subp.subp(['bash', '-c', 'true'])
- finally:
- self.allowed_subp = _old_allowed_subp
diff --git a/cloudinit/tests/test_dhclient_hook.py b/cloudinit/tests/test_dhclient_hook.py
deleted file mode 100644
index eadae81c..00000000
--- a/cloudinit/tests/test_dhclient_hook.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloudinit.dhclient_hook."""
-
-from cloudinit import dhclient_hook as dhc
-from cloudinit.tests.helpers import CiTestCase, dir2dict, populate_dir
-
-import argparse
-import json
-import os
-from unittest import mock
-
-
-class TestDhclientHook(CiTestCase):
-
- ex_env = {
- 'interface': 'eth0',
- 'new_dhcp_lease_time': '3600',
- 'new_host_name': 'x1',
- 'new_ip_address': '10.145.210.163',
- 'new_subnet_mask': '255.255.255.0',
- 'old_host_name': 'x1',
- 'PATH': '/usr/sbin:/usr/bin:/sbin:/bin',
- 'pid': '614',
- 'reason': 'BOUND',
- }
-
- # some older versions of dhclient put the same content,
- # but in upper case with DHCP4_ instead of new_
- ex_env_dhcp4 = {
- 'REASON': 'BOUND',
- 'DHCP4_dhcp_lease_time': '3600',
- 'DHCP4_host_name': 'x1',
- 'DHCP4_ip_address': '10.145.210.163',
- 'DHCP4_subnet_mask': '255.255.255.0',
- 'INTERFACE': 'eth0',
- 'PATH': '/usr/sbin:/usr/bin:/sbin:/bin',
- 'pid': '614',
- }
-
- expected = {
- 'dhcp_lease_time': '3600',
- 'host_name': 'x1',
- 'ip_address': '10.145.210.163',
- 'subnet_mask': '255.255.255.0'}
-
- def setUp(self):
- super(TestDhclientHook, self).setUp()
- self.tmp = self.tmp_dir()
-
- def test_handle_args(self):
- """quick test of call to handle_args."""
- nic = 'eth0'
- args = argparse.Namespace(event=dhc.UP, interface=nic)
- with mock.patch.dict("os.environ", clear=True, values=self.ex_env):
- dhc.handle_args(dhc.NAME, args, data_d=self.tmp)
- found = dir2dict(self.tmp + os.path.sep)
- self.assertEqual([nic + ".json"], list(found.keys()))
- self.assertEqual(self.expected, json.loads(found[nic + ".json"]))
-
- def test_run_hook_up_creates_dir(self):
- """If dir does not exist, run_hook should create it."""
- subd = self.tmp_path("subdir", self.tmp)
- nic = 'eth1'
- dhc.run_hook(nic, 'up', data_d=subd, env=self.ex_env)
- self.assertEqual(
- set([nic + ".json"]), set(dir2dict(subd + os.path.sep)))
-
- def test_run_hook_up(self):
- """Test expected use of run_hook_up."""
- nic = 'eth0'
- dhc.run_hook(nic, 'up', data_d=self.tmp, env=self.ex_env)
- found = dir2dict(self.tmp + os.path.sep)
- self.assertEqual([nic + ".json"], list(found.keys()))
- self.assertEqual(self.expected, json.loads(found[nic + ".json"]))
-
- def test_run_hook_up_dhcp4_prefix(self):
- """Test run_hook filters correctly with older DHCP4_ data."""
- nic = 'eth0'
- dhc.run_hook(nic, 'up', data_d=self.tmp, env=self.ex_env_dhcp4)
- found = dir2dict(self.tmp + os.path.sep)
- self.assertEqual([nic + ".json"], list(found.keys()))
- self.assertEqual(self.expected, json.loads(found[nic + ".json"]))
-
- def test_run_hook_down_deletes(self):
- """down should delete the created json file."""
- nic = 'eth1'
- populate_dir(
- self.tmp, {nic + ".json": "{'abcd'}", 'myfile.txt': 'text'})
- dhc.run_hook(nic, 'down', data_d=self.tmp, env={'old_host_name': 'x1'})
- self.assertEqual(
- set(['myfile.txt']),
- set(dir2dict(self.tmp + os.path.sep)))
-
- def test_get_parser(self):
- """Smoke test creation of get_parser."""
- # cloud-init main uses 'action'.
- event, interface = (dhc.UP, 'mynic0')
- self.assertEqual(
- argparse.Namespace(event=event, interface=interface,
- action=(dhc.NAME, dhc.handle_args)),
- dhc.get_parser().parse_args([event, interface]))
-
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_dmi.py b/cloudinit/tests/test_dmi.py
deleted file mode 100644
index 78a72122..00000000
--- a/cloudinit/tests/test_dmi.py
+++ /dev/null
@@ -1,154 +0,0 @@
-from cloudinit.tests import helpers
-from cloudinit import dmi
-from cloudinit import util
-from cloudinit import subp
-
-import os
-import tempfile
-import shutil
-from unittest import mock
-
-
-class TestReadDMIData(helpers.FilesystemMockingTestCase):
-
- def setUp(self):
- super(TestReadDMIData, self).setUp()
- self.new_root = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.new_root)
- self.reRoot(self.new_root)
- p = mock.patch("cloudinit.dmi.is_container", return_value=False)
- self.addCleanup(p.stop)
- self._m_is_container = p.start()
- p = mock.patch("cloudinit.dmi.is_FreeBSD", return_value=False)
- self.addCleanup(p.stop)
- self._m_is_FreeBSD = p.start()
-
- def _create_sysfs_parent_directory(self):
- util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
-
- def _create_sysfs_file(self, key, content):
- """Mocks the sys path found on Linux systems."""
- self._create_sysfs_parent_directory()
- dmi_key = "/sys/class/dmi/id/{0}".format(key)
- util.write_file(dmi_key, content)
-
- def _configure_dmidecode_return(self, key, content, error=None):
- """
- In order to test a missing sys path and call outs to dmidecode, this
- function fakes the results of dmidecode to test the results.
- """
- def _dmidecode_subp(cmd):
- if cmd[-1] != key:
- raise subp.ProcessExecutionError()
- return (content, error)
-
- self.patched_funcs.enter_context(
- mock.patch("cloudinit.dmi.subp.which", side_effect=lambda _: True))
- self.patched_funcs.enter_context(
- mock.patch("cloudinit.dmi.subp.subp", side_effect=_dmidecode_subp))
-
- def _configure_kenv_return(self, key, content, error=None):
- """
- In order to test a FreeBSD system call outs to kenv, this
- function fakes the results of kenv to test the results.
- """
- def _kenv_subp(cmd):
- if cmd[-1] != dmi.DMIDECODE_TO_KERNEL[key].freebsd:
- raise subp.ProcessExecutionError()
- return (content, error)
-
- self.patched_funcs.enter_context(
- mock.patch("cloudinit.dmi.subp.subp", side_effect=_kenv_subp))
-
- def patch_mapping(self, new_mapping):
- self.patched_funcs.enter_context(
- mock.patch('cloudinit.dmi.DMIDECODE_TO_KERNEL',
- new_mapping))
-
- def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self):
- self.patch_mapping({'mapped-key': dmi.kdmi('mapped-value', None)})
- expected_dmi_value = 'sys-used-correctly'
- self._create_sysfs_file('mapped-value', expected_dmi_value)
- self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong')
- self.assertEqual(expected_dmi_value, dmi.read_dmi_data('mapped-key'))
-
- def test_dmidecode_used_if_no_sysfs_file_on_disk(self):
- self.patch_mapping({})
- self._create_sysfs_parent_directory()
- expected_dmi_value = 'dmidecode-used'
- self._configure_dmidecode_return('use-dmidecode', expected_dmi_value)
- with mock.patch("cloudinit.util.os.uname") as m_uname:
- m_uname.return_value = ('x-sysname', 'x-nodename',
- 'x-release', 'x-version', 'x86_64')
- self.assertEqual(expected_dmi_value,
- dmi.read_dmi_data('use-dmidecode'))
-
- def test_dmidecode_not_used_on_arm(self):
- self.patch_mapping({})
- print("current =%s", subp)
- self._create_sysfs_parent_directory()
- dmi_val = 'from-dmidecode'
- dmi_name = 'use-dmidecode'
- self._configure_dmidecode_return(dmi_name, dmi_val)
- print("now =%s", subp)
-
- expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val}
- found = {}
- # we do not run the 'dmi-decode' binary on some arches
- # verify that anything requested that is not in the sysfs dir
- # will return None on those arches.
- with mock.patch("cloudinit.util.os.uname") as m_uname:
- for arch in expected:
- m_uname.return_value = ('x-sysname', 'x-nodename',
- 'x-release', 'x-version', arch)
- print("now2 =%s", subp)
- found[arch] = dmi.read_dmi_data(dmi_name)
- self.assertEqual(expected, found)
-
- def test_none_returned_if_neither_source_has_data(self):
- self.patch_mapping({})
- self._configure_dmidecode_return('key', 'value')
- self.assertIsNone(dmi.read_dmi_data('expect-fail'))
-
- def test_none_returned_if_dmidecode_not_in_path(self):
- self.patched_funcs.enter_context(
- mock.patch.object(subp, 'which', lambda _: False))
- self.patch_mapping({})
- self.assertIsNone(dmi.read_dmi_data('expect-fail'))
-
- def test_empty_string_returned_instead_of_foxfox(self):
- # uninitialized dmi values show as \xff, return empty string
- my_len = 32
- dmi_value = b'\xff' * my_len + b'\n'
- expected = ""
- dmi_key = 'system-product-name'
- sysfs_key = 'product_name'
- self._create_sysfs_file(sysfs_key, dmi_value)
- self.assertEqual(expected, dmi.read_dmi_data(dmi_key))
-
- def test_container_returns_none(self):
- """In a container read_dmi_data should always return None."""
-
- # first verify we get the value if not in container
- self._m_is_container.return_value = False
- key, val = ("system-product-name", "my_product")
- self._create_sysfs_file('product_name', val)
- self.assertEqual(val, dmi.read_dmi_data(key))
-
- # then verify in container returns None
- self._m_is_container.return_value = True
- self.assertIsNone(dmi.read_dmi_data(key))
-
- def test_container_returns_none_on_unknown(self):
- """In a container even bogus keys return None."""
- self._m_is_container.return_value = True
- self._create_sysfs_file('product_name', "should-be-ignored")
- self.assertIsNone(dmi.read_dmi_data("bogus"))
- self.assertIsNone(dmi.read_dmi_data("system-product-name"))
-
- def test_freebsd_uses_kenv(self):
- """On a FreeBSD system, kenv is called."""
- self._m_is_FreeBSD.return_value = True
- key, val = ("system-product-name", "my_product")
- self._configure_kenv_return(key, val)
- self.assertEqual(dmi.read_dmi_data(key), val)
diff --git a/cloudinit/tests/test_features.py b/cloudinit/tests/test_features.py
deleted file mode 100644
index d7a7226d..00000000
--- a/cloudinit/tests/test_features.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-# pylint: disable=no-member,no-name-in-module
-"""
-This file is for testing the feature flag functionality itself,
-NOT for testing any individual feature flag
-"""
-import pytest
-import sys
-from pathlib import Path
-
-import cloudinit
-
-
-@pytest.yield_fixture()
-def create_override(request):
- """
- Create a feature overrides file and do some module wizardry to make
- it seem like we're importing the features file for the first time.
-
- After creating the override file with the values passed by the test,
- we need to reload cloudinit.features
- to get all of the current features (including the overridden ones).
- Once the test is complete, we remove the file we created and set
- features and feature_overrides modules to how they were before
- the test started
- """
- override_path = Path(cloudinit.__file__).parent / 'feature_overrides.py'
- if override_path.exists():
- raise Exception("feature_overrides.py unexpectedly exists! "
- "Remove it to run this test.")
- with override_path.open('w') as f:
- for key, value in request.param.items():
- f.write('{} = {}\n'.format(key, value))
-
- sys.modules.pop('cloudinit.features', None)
-
- yield
-
- override_path.unlink()
- sys.modules.pop('cloudinit.feature_overrides', None)
-
-
-class TestFeatures:
- def test_feature_without_override(self):
- from cloudinit.features import ERROR_ON_USER_DATA_FAILURE
- assert ERROR_ON_USER_DATA_FAILURE is True
-
- @pytest.mark.parametrize('create_override',
- [{'ERROR_ON_USER_DATA_FAILURE': False}],
- indirect=True)
- def test_feature_with_override(self, create_override):
- from cloudinit.features import ERROR_ON_USER_DATA_FAILURE
- assert ERROR_ON_USER_DATA_FAILURE is False
-
- @pytest.mark.parametrize('create_override',
- [{'SPAM': True}],
- indirect=True)
- def test_feature_only_in_override(self, create_override):
- from cloudinit.features import SPAM
- assert SPAM is True
diff --git a/cloudinit/tests/test_gpg.py b/cloudinit/tests/test_gpg.py
deleted file mode 100644
index 311dfad6..00000000
--- a/cloudinit/tests/test_gpg.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-"""Test gpg module."""
-
-from unittest import mock
-
-from cloudinit import gpg
-from cloudinit import subp
-from cloudinit.tests.helpers import CiTestCase
-
-
-@mock.patch("cloudinit.gpg.time.sleep")
-@mock.patch("cloudinit.gpg.subp.subp")
-class TestReceiveKeys(CiTestCase):
- """Test the recv_key method."""
-
- def test_retries_on_subp_exc(self, m_subp, m_sleep):
- """retry should be done on gpg receive keys failure."""
- retries = (1, 2, 4)
- my_exc = subp.ProcessExecutionError(
- stdout='', stderr='', exit_code=2, cmd=['mycmd'])
- m_subp.side_effect = (my_exc, my_exc, ('', ''))
- gpg.recv_key("ABCD", "keyserver.example.com", retries=retries)
- self.assertEqual([mock.call(1), mock.call(2)], m_sleep.call_args_list)
-
- def test_raises_error_after_retries(self, m_subp, m_sleep):
- """If the final run fails, error should be raised."""
- naplen = 1
- keyid, keyserver = ("ABCD", "keyserver.example.com")
- m_subp.side_effect = subp.ProcessExecutionError(
- stdout='', stderr='', exit_code=2, cmd=['mycmd'])
- with self.assertRaises(ValueError) as rcm:
- gpg.recv_key(keyid, keyserver, retries=(naplen,))
- self.assertIn(keyid, str(rcm.exception))
- self.assertIn(keyserver, str(rcm.exception))
- m_sleep.assert_called_with(naplen)
-
- def test_no_retries_on_none(self, m_subp, m_sleep):
- """retry should not be done if retries is None."""
- m_subp.side_effect = subp.ProcessExecutionError(
- stdout='', stderr='', exit_code=2, cmd=['mycmd'])
- with self.assertRaises(ValueError):
- gpg.recv_key("ABCD", "keyserver.example.com", retries=None)
- m_sleep.assert_not_called()
-
- def test_expected_gpg_command(self, m_subp, m_sleep):
- """Verify gpg is called with expected args."""
- key, keyserver = ("DEADBEEF", "keyserver.example.com")
- retries = (1, 2, 4)
- m_subp.return_value = ('', '')
- gpg.recv_key(key, keyserver, retries=retries)
- m_subp.assert_called_once_with(
- ['gpg', '--no-tty',
- '--keyserver=%s' % keyserver, '--recv-keys', key],
- capture=True)
- m_sleep.assert_not_called()
diff --git a/cloudinit/tests/test_netinfo.py b/cloudinit/tests/test_netinfo.py
deleted file mode 100644
index e44b16d8..00000000
--- a/cloudinit/tests/test_netinfo.py
+++ /dev/null
@@ -1,181 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests netinfo module functions and classes."""
-
-from copy import copy
-
-from cloudinit.netinfo import netdev_info, netdev_pformat, route_pformat
-from cloudinit.tests.helpers import CiTestCase, mock, readResource
-
-
-# Example ifconfig and route output
-SAMPLE_OLD_IFCONFIG_OUT = readResource("netinfo/old-ifconfig-output")
-SAMPLE_NEW_IFCONFIG_OUT = readResource("netinfo/new-ifconfig-output")
-SAMPLE_FREEBSD_IFCONFIG_OUT = readResource("netinfo/freebsd-ifconfig-output")
-SAMPLE_IPADDRSHOW_OUT = readResource("netinfo/sample-ipaddrshow-output")
-SAMPLE_ROUTE_OUT_V4 = readResource("netinfo/sample-route-output-v4")
-SAMPLE_ROUTE_OUT_V6 = readResource("netinfo/sample-route-output-v6")
-SAMPLE_IPROUTE_OUT_V4 = readResource("netinfo/sample-iproute-output-v4")
-SAMPLE_IPROUTE_OUT_V6 = readResource("netinfo/sample-iproute-output-v6")
-NETDEV_FORMATTED_OUT = readResource("netinfo/netdev-formatted-output")
-ROUTE_FORMATTED_OUT = readResource("netinfo/route-formatted-output")
-FREEBSD_NETDEV_OUT = readResource("netinfo/freebsd-netdev-formatted-output")
-
-
-class TestNetInfo(CiTestCase):
-
- maxDiff = None
- with_logs = True
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_old_nettools_pformat(self, m_subp, m_which):
- """netdev_pformat properly rendering old nettools info."""
- m_subp.return_value = (SAMPLE_OLD_IFCONFIG_OUT, '')
- m_which.side_effect = lambda x: x if x == 'ifconfig' else None
- content = netdev_pformat()
- self.assertEqual(NETDEV_FORMATTED_OUT, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_new_nettools_pformat(self, m_subp, m_which):
- """netdev_pformat properly rendering netdev new nettools info."""
- m_subp.return_value = (SAMPLE_NEW_IFCONFIG_OUT, '')
- m_which.side_effect = lambda x: x if x == 'ifconfig' else None
- content = netdev_pformat()
- self.assertEqual(NETDEV_FORMATTED_OUT, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_freebsd_nettools_pformat(self, m_subp, m_which):
- """netdev_pformat properly rendering netdev new nettools info."""
- m_subp.return_value = (SAMPLE_FREEBSD_IFCONFIG_OUT, '')
- m_which.side_effect = lambda x: x if x == 'ifconfig' else None
- content = netdev_pformat()
- print()
- print(content)
- print()
- self.assertEqual(FREEBSD_NETDEV_OUT, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_iproute_pformat(self, m_subp, m_which):
- """netdev_pformat properly rendering ip route info."""
- m_subp.return_value = (SAMPLE_IPADDRSHOW_OUT, '')
- m_which.side_effect = lambda x: x if x == 'ip' else None
- content = netdev_pformat()
- new_output = copy(NETDEV_FORMATTED_OUT)
- # ip route show describes global scopes on ipv4 addresses
- # whereas ifconfig does not. Add proper global/host scope to output.
- new_output = new_output.replace('| . | 50:7b', '| global | 50:7b')
- new_output = new_output.replace(
- '255.0.0.0 | . |', '255.0.0.0 | host |')
- self.assertEqual(new_output, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_warn_on_missing_commands(self, m_subp, m_which):
- """netdev_pformat warns when missing both ip and 'netstat'."""
- m_which.return_value = None # Niether ip nor netstat found
- content = netdev_pformat()
- self.assertEqual('\n', content)
- self.assertEqual(
- "WARNING: Could not print networks: missing 'ip' and 'ifconfig'"
- " commands\n",
- self.logs.getvalue())
- m_subp.assert_not_called()
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_info_nettools_down(self, m_subp, m_which):
- """test netdev_info using nettools and down interfaces."""
- m_subp.return_value = (
- readResource("netinfo/new-ifconfig-output-down"), "")
- m_which.side_effect = lambda x: x if x == 'ifconfig' else None
- self.assertEqual(
- {'eth0': {'ipv4': [], 'ipv6': [],
- 'hwaddr': '00:16:3e:de:51:a6', 'up': False},
- 'lo': {'ipv4': [{'ip': '127.0.0.1', 'mask': '255.0.0.0'}],
- 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}],
- 'hwaddr': '.', 'up': True}},
- netdev_info("."))
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_netdev_info_iproute_down(self, m_subp, m_which):
- """Test netdev_info with ip and down interfaces."""
- m_subp.return_value = (
- readResource("netinfo/sample-ipaddrshow-output-down"), "")
- m_which.side_effect = lambda x: x if x == 'ip' else None
- self.assertEqual(
- {'lo': {'ipv4': [{'ip': '127.0.0.1', 'bcast': '.',
- 'mask': '255.0.0.0', 'scope': 'host'}],
- 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}],
- 'hwaddr': '.', 'up': True},
- 'eth0': {'ipv4': [], 'ipv6': [],
- 'hwaddr': '00:16:3e:de:51:a6', 'up': False}},
- netdev_info("."))
-
- @mock.patch('cloudinit.netinfo.netdev_info')
- def test_netdev_pformat_with_down(self, m_netdev_info):
- """test netdev_pformat when netdev_info returns 'down' interfaces."""
- m_netdev_info.return_value = (
- {'lo': {'ipv4': [{'ip': '127.0.0.1', 'mask': '255.0.0.0',
- 'scope': 'host'}],
- 'ipv6': [{'ip': '::1/128', 'scope6': 'host'}],
- 'hwaddr': '.', 'up': True},
- 'eth0': {'ipv4': [], 'ipv6': [],
- 'hwaddr': '00:16:3e:de:51:a6', 'up': False}})
- self.assertEqual(
- readResource("netinfo/netdev-formatted-output-down"),
- netdev_pformat())
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_route_nettools_pformat(self, m_subp, m_which):
- """route_pformat properly rendering nettools route info."""
-
- def subp_netstat_route_selector(*args, **kwargs):
- if args[0] == ['netstat', '--route', '--numeric', '--extend']:
- return (SAMPLE_ROUTE_OUT_V4, '')
- if args[0] == ['netstat', '-A', 'inet6', '--route', '--numeric']:
- return (SAMPLE_ROUTE_OUT_V6, '')
- raise Exception('Unexpected subp call %s' % args[0])
-
- m_subp.side_effect = subp_netstat_route_selector
- m_which.side_effect = lambda x: x if x == 'netstat' else None
- content = route_pformat()
- self.assertEqual(ROUTE_FORMATTED_OUT, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_route_iproute_pformat(self, m_subp, m_which):
- """route_pformat properly rendering ip route info."""
-
- def subp_iproute_selector(*args, **kwargs):
- if ['ip', '-o', 'route', 'list'] == args[0]:
- return (SAMPLE_IPROUTE_OUT_V4, '')
- v6cmd = ['ip', '--oneline', '-6', 'route', 'list', 'table', 'all']
- if v6cmd == args[0]:
- return (SAMPLE_IPROUTE_OUT_V6, '')
- raise Exception('Unexpected subp call %s' % args[0])
-
- m_subp.side_effect = subp_iproute_selector
- m_which.side_effect = lambda x: x if x == 'ip' else None
- content = route_pformat()
- self.assertEqual(ROUTE_FORMATTED_OUT, content)
-
- @mock.patch('cloudinit.netinfo.subp.which')
- @mock.patch('cloudinit.netinfo.subp.subp')
- def test_route_warn_on_missing_commands(self, m_subp, m_which):
- """route_pformat warns when missing both ip and 'netstat'."""
- m_which.return_value = None # Niether ip nor netstat found
- content = route_pformat()
- self.assertEqual('\n', content)
- self.assertEqual(
- "WARNING: Could not print routes: missing 'ip' and 'netstat'"
- " commands\n",
- self.logs.getvalue())
- m_subp.assert_not_called()
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_persistence.py b/cloudinit/tests/test_persistence.py
deleted file mode 100644
index ec1152a9..00000000
--- a/cloudinit/tests/test_persistence.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright (C) 2020 Canonical Ltd.
-#
-# Author: Daniel Watkins <oddbloke@ubuntu.com>
-#
-# This file is part of cloud-init. See LICENSE file for license information.
-"""
-Tests for cloudinit.persistence.
-
-Per https://docs.python.org/3/library/pickle.html, only "classes that are
-defined at the top level of a module" can be pickled. This means that all of
-our ``CloudInitPickleMixin`` subclasses for testing must be defined at
-module-level (rather than being defined inline or dynamically in the body of
-test methods, as we would do without this constraint).
-
-``TestPickleMixin.test_subclasses`` iterates over a list of all of these
-classes, and tests that they round-trip through a pickle dump/load. As the
-interface we're testing is that ``_unpickle`` is called appropriately on
-subclasses, our subclasses define their assertions in their ``_unpickle``
-implementation. (This means that the assertions will not be executed if
-``_unpickle`` is not called at all; we have
-``TestPickleMixin.test_unpickle_called`` to ensure it is called.)
-
-To avoid manually maintaining a list of classes for parametrization we use a
-simple metaclass, ``_Collector``, to gather them up.
-"""
-
-import pickle
-from unittest import mock
-
-import pytest
-
-from cloudinit.persistence import CloudInitPickleMixin
-
-
-class _Collector(type):
- """Any class using this as a metaclass will be stored in test_classes."""
-
- test_classes = []
-
- def __new__(cls, *args):
- new_cls = super().__new__(cls, *args)
- _Collector.test_classes.append(new_cls)
- return new_cls
-
-
-class InstanceVersionNotUsed(CloudInitPickleMixin, metaclass=_Collector):
- """Test that the class version is used over one set in instance state."""
-
- _ci_pkl_version = 1
-
- def __init__(self):
- self._ci_pkl_version = 2
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- assert 1 == ci_pkl_version
-
-
-class MissingVersionHandled(CloudInitPickleMixin, metaclass=_Collector):
- """Test that pickles without ``_ci_pkl_version`` are handled gracefully.
-
- This is tested by overriding ``__getstate__`` so the dumped pickle of this
- class will not have ``_ci_pkl_version`` included.
- """
-
- def __getstate__(self):
- return self.__dict__
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- assert 0 == ci_pkl_version
-
-
-class OverridenVersionHonored(CloudInitPickleMixin, metaclass=_Collector):
- """Test that the subclass's version is used."""
-
- _ci_pkl_version = 1
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- assert 1 == ci_pkl_version
-
-
-class StateIsRestored(CloudInitPickleMixin, metaclass=_Collector):
- """Instance state should be restored before ``_unpickle`` is called."""
-
- def __init__(self):
- self.some_state = "some state"
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- assert "some state" == self.some_state
-
-
-class UnpickleCanBeUnoverriden(CloudInitPickleMixin, metaclass=_Collector):
- """Subclasses should not need to override ``_unpickle``."""
-
-
-class VersionDefaultsToZero(CloudInitPickleMixin, metaclass=_Collector):
- """Test that the default version is 0."""
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- assert 0 == ci_pkl_version
-
-
-class VersionIsPoppedFromState(CloudInitPickleMixin, metaclass=_Collector):
- """Test _ci_pkl_version is popped from state before being restored."""
-
- def _unpickle(self, ci_pkl_version: int) -> None:
- # `self._ci_pkl_version` returns the type's _ci_pkl_version if it isn't
- # in instance state, so we need to explicitly check self.__dict__.
- assert "_ci_pkl_version" not in self.__dict__
-
-
-class TestPickleMixin:
- def test_unpickle_called(self):
- """Test that self._unpickle is called on unpickle."""
- with mock.patch.object(
- CloudInitPickleMixin, "_unpickle"
- ) as m_unpickle:
- pickle.loads(pickle.dumps(CloudInitPickleMixin()))
- assert 1 == m_unpickle.call_count
-
- @pytest.mark.parametrize("cls", _Collector.test_classes)
- def test_subclasses(self, cls):
- """For each collected class, round-trip through pickle dump/load.
-
- Assertions are implemented in ``cls._unpickle``, and so are evoked as
- part of the pickle load.
- """
- pickle.loads(pickle.dumps(cls()))
diff --git a/cloudinit/tests/test_simpletable.py b/cloudinit/tests/test_simpletable.py
deleted file mode 100644
index a12a62a0..00000000
--- a/cloudinit/tests/test_simpletable.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright (C) 2017 Amazon.com, Inc. or its affiliates
-#
-# Author: Andrew Jorgensen <ajorgens@amazon.com>
-#
-# This file is part of cloud-init. See LICENSE file for license information.
-"""Tests that SimpleTable works just like PrettyTable for cloud-init.
-
-Not all possible PrettyTable cases are tested because we're not trying to
-reimplement the entire library, only the minimal parts we actually use.
-"""
-
-from cloudinit.simpletable import SimpleTable
-from cloudinit.tests.helpers import CiTestCase
-
-# Examples rendered by cloud-init using PrettyTable
-NET_DEVICE_FIELDS = (
- 'Device', 'Up', 'Address', 'Mask', 'Scope', 'Hw-Address')
-NET_DEVICE_ROWS = (
- ('ens3', True, '172.31.4.203', '255.255.240.0', '.', '0a:1f:07:15:98:70'),
- ('ens3', True, 'fe80::81f:7ff:fe15:9870/64', '.', 'link',
- '0a:1f:07:15:98:70'),
- ('lo', True, '127.0.0.1', '255.0.0.0', '.', '.'),
- ('lo', True, '::1/128', '.', 'host', '.'),
-)
-NET_DEVICE_TABLE = """\
-+--------+------+----------------------------+---------------+-------+-------------------+
-| Device | Up | Address | Mask | Scope | Hw-Address |
-+--------+------+----------------------------+---------------+-------+-------------------+
-| ens3 | True | 172.31.4.203 | 255.255.240.0 | . | 0a:1f:07:15:98:70 |
-| ens3 | True | fe80::81f:7ff:fe15:9870/64 | . | link | 0a:1f:07:15:98:70 |
-| lo | True | 127.0.0.1 | 255.0.0.0 | . | . |
-| lo | True | ::1/128 | . | host | . |
-+--------+------+----------------------------+---------------+-------+-------------------+""" # noqa: E501
-ROUTE_IPV4_FIELDS = (
- 'Route', 'Destination', 'Gateway', 'Genmask', 'Interface', 'Flags')
-ROUTE_IPV4_ROWS = (
- ('0', '0.0.0.0', '172.31.0.1', '0.0.0.0', 'ens3', 'UG'),
- ('1', '169.254.0.0', '0.0.0.0', '255.255.0.0', 'ens3', 'U'),
- ('2', '172.31.0.0', '0.0.0.0', '255.255.240.0', 'ens3', 'U'),
-)
-ROUTE_IPV4_TABLE = """\
-+-------+-------------+------------+---------------+-----------+-------+
-| Route | Destination | Gateway | Genmask | Interface | Flags |
-+-------+-------------+------------+---------------+-----------+-------+
-| 0 | 0.0.0.0 | 172.31.0.1 | 0.0.0.0 | ens3 | UG |
-| 1 | 169.254.0.0 | 0.0.0.0 | 255.255.0.0 | ens3 | U |
-| 2 | 172.31.0.0 | 0.0.0.0 | 255.255.240.0 | ens3 | U |
-+-------+-------------+------------+---------------+-----------+-------+"""
-
-AUTHORIZED_KEYS_FIELDS = (
- 'Keytype', 'Fingerprint (md5)', 'Options', 'Comment')
-AUTHORIZED_KEYS_ROWS = (
- ('ssh-rsa', '24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36', '-',
- 'ajorgens'),
-)
-AUTHORIZED_KEYS_TABLE = """\
-+---------+-------------------------------------------------+---------+----------+
-| Keytype | Fingerprint (md5) | Options | Comment |
-+---------+-------------------------------------------------+---------+----------+
-| ssh-rsa | 24:c7:41:49:47:12:31:a0:de:6f:62:79:9b:13:06:36 | - | ajorgens |
-+---------+-------------------------------------------------+---------+----------+""" # noqa: E501
-
-# from prettytable import PrettyTable
-# pt = PrettyTable(('HEADER',))
-# print(pt)
-NO_ROWS_FIELDS = ('HEADER',)
-NO_ROWS_TABLE = """\
-+--------+
-| HEADER |
-+--------+
-+--------+"""
-
-
-class TestSimpleTable(CiTestCase):
-
- def test_no_rows(self):
- """An empty table is rendered as PrettyTable would have done it."""
- table = SimpleTable(NO_ROWS_FIELDS)
- self.assertEqual(str(table), NO_ROWS_TABLE)
-
- def test_net_dev(self):
- """Net device info is rendered as it was with PrettyTable."""
- table = SimpleTable(NET_DEVICE_FIELDS)
- for row in NET_DEVICE_ROWS:
- table.add_row(row)
- self.assertEqual(str(table), NET_DEVICE_TABLE)
-
- def test_route_ipv4(self):
- """Route IPv4 info is rendered as it was with PrettyTable."""
- table = SimpleTable(ROUTE_IPV4_FIELDS)
- for row in ROUTE_IPV4_ROWS:
- table.add_row(row)
- self.assertEqual(str(table), ROUTE_IPV4_TABLE)
-
- def test_authorized_keys(self):
- """SSH authorized keys are rendered as they were with PrettyTable."""
- table = SimpleTable(AUTHORIZED_KEYS_FIELDS)
- for row in AUTHORIZED_KEYS_ROWS:
- table.add_row(row)
-
- def test_get_string(self):
- """get_string() method returns the same content as str()."""
- table = SimpleTable(AUTHORIZED_KEYS_FIELDS)
- for row in AUTHORIZED_KEYS_ROWS:
- table.add_row(row)
- self.assertEqual(table.get_string(), str(table))
diff --git a/cloudinit/tests/test_stages.py b/cloudinit/tests/test_stages.py
deleted file mode 100644
index d2d1b37f..00000000
--- a/cloudinit/tests/test_stages.py
+++ /dev/null
@@ -1,406 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests related to cloudinit.stages module."""
-
-import os
-import stat
-
-import pytest
-
-from cloudinit import stages
-from cloudinit import sources
-from cloudinit.sources import NetworkConfigSource
-
-from cloudinit.event import EventType
-from cloudinit.util import write_file
-
-from cloudinit.tests.helpers import CiTestCase, mock
-
-TEST_INSTANCE_ID = 'i-testing'
-
-
-class FakeDataSource(sources.DataSource):
-
- def __init__(self, paths=None, userdata=None, vendordata=None,
- network_config=''):
- super(FakeDataSource, self).__init__({}, None, paths=paths)
- self.metadata = {'instance-id': TEST_INSTANCE_ID}
- self.userdata_raw = userdata
- self.vendordata_raw = vendordata
- self._network_config = None
- if network_config: # Permit for None value to setup attribute
- self._network_config = network_config
-
- @property
- def network_config(self):
- return self._network_config
-
- def _get_data(self):
- return True
-
-
-class TestInit(CiTestCase):
- with_logs = True
- allowed_subp = False
-
- def setUp(self):
- super(TestInit, self).setUp()
- self.tmpdir = self.tmp_dir()
- self.init = stages.Init()
- # Setup fake Paths for Init to reference
- self.init._cfg = {'system_info': {
- 'distro': 'ubuntu', 'paths': {'cloud_dir': self.tmpdir,
- 'run_dir': self.tmpdir}}}
- self.init.datasource = FakeDataSource(paths=self.init.paths)
-
- def test_wb__find_networking_config_disabled(self):
- """find_networking_config returns no config when disabled."""
- disable_file = os.path.join(
- self.init.paths.get_cpath('data'), 'upgraded-network')
- write_file(disable_file, '')
- self.assertEqual(
- (None, disable_file),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_disabled_by_kernel(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns when disabled by kernel cmdline."""
- m_cmdline.return_value = {'config': 'disabled'}
- m_initramfs.return_value = {'config': ['fake_initrd']}
- self.assertEqual(
- (None, NetworkConfigSource.cmdline),
- self.init._find_networking_config())
- self.assertEqual('DEBUG: network config disabled by cmdline\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_disabled_by_initrd(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns when disabled by kernel cmdline."""
- m_cmdline.return_value = {}
- m_initramfs.return_value = {'config': 'disabled'}
- self.assertEqual(
- (None, NetworkConfigSource.initramfs),
- self.init._find_networking_config())
- self.assertEqual('DEBUG: network config disabled by initramfs\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_disabled_by_datasrc(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns when disabled by datasource cfg."""
- m_cmdline.return_value = {} # Kernel doesn't disable networking
- m_initramfs.return_value = {} # initramfs doesn't disable networking
- self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}},
- 'network': {}} # system config doesn't disable
-
- self.init.datasource = FakeDataSource(
- network_config={'config': 'disabled'})
- self.assertEqual(
- (None, NetworkConfigSource.ds),
- self.init._find_networking_config())
- self.assertEqual('DEBUG: network config disabled by ds\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_disabled_by_sysconfig(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns when disabled by system config."""
- m_cmdline.return_value = {} # Kernel doesn't disable networking
- m_initramfs.return_value = {} # initramfs doesn't disable networking
- self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}},
- 'network': {'config': 'disabled'}}
- self.assertEqual(
- (None, NetworkConfigSource.system_cfg),
- self.init._find_networking_config())
- self.assertEqual('DEBUG: network config disabled by system_cfg\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test__find_networking_config_uses_datasrc_order(
- self, m_cmdline, m_initramfs):
- """find_networking_config should check sources in DS defined order"""
- # cmdline and initramfs, which would normally be preferred over other
- # sources, disable networking; in this case, though, the DS moves them
- # later so its own config is preferred
- m_cmdline.return_value = {'config': 'disabled'}
- m_initramfs.return_value = {'config': 'disabled'}
-
- ds_net_cfg = {'config': {'needle': True}}
- self.init.datasource = FakeDataSource(network_config=ds_net_cfg)
- self.init.datasource.network_config_sources = [
- NetworkConfigSource.ds, NetworkConfigSource.system_cfg,
- NetworkConfigSource.cmdline, NetworkConfigSource.initramfs]
-
- self.assertEqual(
- (ds_net_cfg, NetworkConfigSource.ds),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test__find_networking_config_warns_if_datasrc_uses_invalid_src(
- self, m_cmdline, m_initramfs):
- """find_networking_config should check sources in DS defined order"""
- ds_net_cfg = {'config': {'needle': True}}
- self.init.datasource = FakeDataSource(network_config=ds_net_cfg)
- self.init.datasource.network_config_sources = [
- 'invalid_src', NetworkConfigSource.ds]
-
- self.assertEqual(
- (ds_net_cfg, NetworkConfigSource.ds),
- self.init._find_networking_config())
- self.assertIn('WARNING: data source specifies an invalid network'
- ' cfg_source: invalid_src',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test__find_networking_config_warns_if_datasrc_uses_unavailable_src(
- self, m_cmdline, m_initramfs):
- """find_networking_config should check sources in DS defined order"""
- ds_net_cfg = {'config': {'needle': True}}
- self.init.datasource = FakeDataSource(network_config=ds_net_cfg)
- self.init.datasource.network_config_sources = [
- NetworkConfigSource.fallback, NetworkConfigSource.ds]
-
- self.assertEqual(
- (ds_net_cfg, NetworkConfigSource.ds),
- self.init._find_networking_config())
- self.assertIn('WARNING: data source specifies an unavailable network'
- ' cfg_source: fallback',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_returns_kernel(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns kernel cmdline config if present."""
- expected_cfg = {'config': ['fakekernel']}
- m_cmdline.return_value = expected_cfg
- m_initramfs.return_value = {'config': ['fake_initrd']}
- self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}},
- 'network': {'config': ['fakesys_config']}}
- self.init.datasource = FakeDataSource(
- network_config={'config': ['fakedatasource']})
- self.assertEqual(
- (expected_cfg, NetworkConfigSource.cmdline),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_returns_initramfs(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns kernel cmdline config if present."""
- expected_cfg = {'config': ['fake_initrd']}
- m_cmdline.return_value = {}
- m_initramfs.return_value = expected_cfg
- self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}},
- 'network': {'config': ['fakesys_config']}}
- self.init.datasource = FakeDataSource(
- network_config={'config': ['fakedatasource']})
- self.assertEqual(
- (expected_cfg, NetworkConfigSource.initramfs),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_returns_system_cfg(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns system config when present."""
- m_cmdline.return_value = {} # No kernel network config
- m_initramfs.return_value = {} # no initramfs network config
- expected_cfg = {'config': ['fakesys_config']}
- self.init._cfg = {'system_info': {'paths': {'cloud_dir': self.tmpdir}},
- 'network': expected_cfg}
- self.init.datasource = FakeDataSource(
- network_config={'config': ['fakedatasource']})
- self.assertEqual(
- (expected_cfg, NetworkConfigSource.system_cfg),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_returns_datasrc_cfg(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns datasource net config if present."""
- m_cmdline.return_value = {} # No kernel network config
- m_initramfs.return_value = {} # no initramfs network config
- # No system config for network in setUp
- expected_cfg = {'config': ['fakedatasource']}
- self.init.datasource = FakeDataSource(network_config=expected_cfg)
- self.assertEqual(
- (expected_cfg, NetworkConfigSource.ds),
- self.init._find_networking_config())
-
- @mock.patch('cloudinit.stages.cmdline.read_initramfs_config')
- @mock.patch('cloudinit.stages.cmdline.read_kernel_cmdline_config')
- def test_wb__find_networking_config_returns_fallback(
- self, m_cmdline, m_initramfs):
- """find_networking_config returns fallback config if not defined."""
- m_cmdline.return_value = {} # Kernel doesn't disable networking
- m_initramfs.return_value = {} # no initramfs network config
- # Neither datasource nor system_info disable or provide network
-
- fake_cfg = {'config': [{'type': 'physical', 'name': 'eth9'}],
- 'version': 1}
-
- def fake_generate_fallback():
- return fake_cfg
-
- # Monkey patch distro which gets cached on self.init
- distro = self.init.distro
- distro.generate_fallback_config = fake_generate_fallback
- self.assertEqual(
- (fake_cfg, NetworkConfigSource.fallback),
- self.init._find_networking_config())
- self.assertNotIn('network config disabled', self.logs.getvalue())
-
- def test_apply_network_config_disabled(self):
- """Log when network is disabled by upgraded-network."""
- disable_file = os.path.join(
- self.init.paths.get_cpath('data'), 'upgraded-network')
-
- def fake_network_config():
- return (None, disable_file)
-
- self.init._find_networking_config = fake_network_config
-
- self.init.apply_network_config(True)
- self.assertIn(
- 'INFO: network config is disabled by %s' % disable_file,
- self.logs.getvalue())
-
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
- @mock.patch('cloudinit.distros.ubuntu.Distro')
- def test_apply_network_on_new_instance(self, m_ubuntu, m_macs):
- """Call distro apply_network_config methods on is_new_instance."""
- net_cfg = {
- 'version': 1, 'config': [
- {'subnets': [{'type': 'dhcp'}], 'type': 'physical',
- 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]}
-
- def fake_network_config():
- return net_cfg, NetworkConfigSource.fallback
-
- m_macs.return_value = {'42:42:42:42:42:42': 'eth9'}
-
- self.init._find_networking_config = fake_network_config
- self.init.apply_network_config(True)
- self.init.distro.apply_network_config_names.assert_called_with(net_cfg)
- self.init.distro.apply_network_config.assert_called_with(
- net_cfg, bring_up=True)
-
- @mock.patch('cloudinit.distros.ubuntu.Distro')
- def test_apply_network_on_same_instance_id(self, m_ubuntu):
- """Only call distro.apply_network_config_names on same instance id."""
- old_instance_id = os.path.join(
- self.init.paths.get_cpath('data'), 'instance-id')
- write_file(old_instance_id, TEST_INSTANCE_ID)
- net_cfg = {
- 'version': 1, 'config': [
- {'subnets': [{'type': 'dhcp'}], 'type': 'physical',
- 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]}
-
- def fake_network_config():
- return net_cfg, NetworkConfigSource.fallback
-
- self.init._find_networking_config = fake_network_config
- self.init.apply_network_config(True)
- self.init.distro.apply_network_config_names.assert_called_with(net_cfg)
- self.init.distro.apply_network_config.assert_not_called()
- self.assertIn(
- 'No network config applied. Neither a new instance'
- " nor datasource network update on '%s' event" % EventType.BOOT,
- self.logs.getvalue())
-
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
- @mock.patch('cloudinit.distros.ubuntu.Distro')
- def test_apply_network_on_datasource_allowed_event(self, m_ubuntu, m_macs):
- """Apply network if datasource.update_metadata permits BOOT event."""
- old_instance_id = os.path.join(
- self.init.paths.get_cpath('data'), 'instance-id')
- write_file(old_instance_id, TEST_INSTANCE_ID)
- net_cfg = {
- 'version': 1, 'config': [
- {'subnets': [{'type': 'dhcp'}], 'type': 'physical',
- 'name': 'eth9', 'mac_address': '42:42:42:42:42:42'}]}
-
- def fake_network_config():
- return net_cfg, NetworkConfigSource.fallback
-
- m_macs.return_value = {'42:42:42:42:42:42': 'eth9'}
-
- self.init._find_networking_config = fake_network_config
- self.init.datasource = FakeDataSource(paths=self.init.paths)
- self.init.datasource.update_events = {'network': [EventType.BOOT]}
- self.init.apply_network_config(True)
- self.init.distro.apply_network_config_names.assert_called_with(net_cfg)
- self.init.distro.apply_network_config.assert_called_with(
- net_cfg, bring_up=True)
-
-
-class TestInit_InitializeFilesystem:
- """Tests for cloudinit.stages.Init._initialize_filesystem.
-
- TODO: Expand these tests to cover all of _initialize_filesystem's behavior.
- """
-
- @pytest.yield_fixture
- def init(self, paths):
- """A fixture which yields a stages.Init instance with paths and cfg set
-
- As it is replaced with a mock, consumers of this fixture can set
- `init.cfg` if the default empty dict configuration is not appropriate.
- """
- with mock.patch(
- "cloudinit.stages.Init.cfg", mock.PropertyMock(return_value={})
- ):
- with mock.patch("cloudinit.stages.util.ensure_dirs"):
- init = stages.Init()
- init._paths = paths
- yield init
-
- @mock.patch("cloudinit.stages.util.ensure_file")
- def test_ensure_file_not_called_if_no_log_file_configured(
- self, m_ensure_file, init
- ):
- """If no log file is configured, we should not ensure its existence."""
- init.cfg = {}
-
- init._initialize_filesystem()
-
- assert 0 == m_ensure_file.call_count
-
- def test_log_files_existence_is_ensured_if_configured(self, init, tmpdir):
- """If a log file is configured, we should ensure its existence."""
- log_file = tmpdir.join("cloud-init.log")
- init.cfg = {"def_log_file": str(log_file)}
-
- init._initialize_filesystem()
-
- assert log_file.exists
-
- def test_existing_file_permissions_are_not_modified(self, init, tmpdir):
- """If the log file already exists, we should not modify its permissions
-
- See https://bugs.launchpad.net/cloud-init/+bug/1900837.
- """
- # Use a mode that will never be made the default so this test will
- # always be valid
- mode = 0o606
- log_file = tmpdir.join("cloud-init.log")
- log_file.ensure()
- log_file.chmod(mode)
- init.cfg = {"def_log_file": str(log_file)}
-
- init._initialize_filesystem()
-
- assert mode == stat.S_IMODE(log_file.stat().mode)
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_subp.py b/cloudinit/tests/test_subp.py
deleted file mode 100644
index 911c1f3d..00000000
--- a/cloudinit/tests/test_subp.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloudinit.subp utility functions"""
-
-import json
-import os
-import sys
-import stat
-
-from unittest import mock
-
-from cloudinit import subp, util
-from cloudinit.tests.helpers import CiTestCase
-
-
-BASH = subp.which('bash')
-BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
-
-
-class TestPrependBaseCommands(CiTestCase):
-
- with_logs = True
-
- def test_prepend_base_command_errors_on_neither_string_nor_list(self):
- """Raise an error for each command which is not a string or list."""
- orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
- with self.assertRaises(TypeError) as context_manager:
- subp.prepend_base_command(
- base_command='basecmd', commands=orig_commands)
- self.assertEqual(
- "Invalid basecmd config. These commands are not a string or"
- " list:\n1\n{'not': 'gonna work'}",
- str(context_manager.exception))
-
- def test_prepend_base_command_warns_on_non_base_string_commands(self):
- """Warn on each non-base for commands of type string."""
- orig_commands = [
- 'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
- fixed_commands = subp.prepend_base_command(
- base_command='basecmd', commands=orig_commands)
- self.assertEqual(
- 'WARNING: Non-basecmd commands in basecmd config:\n'
- 'ls\ntouch /blah\n',
- self.logs.getvalue())
- self.assertEqual(orig_commands, fixed_commands)
-
- def test_prepend_base_command_prepends_on_non_base_list_commands(self):
- """Prepend 'basecmd' for each non-basecmd command of type list."""
- orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
- ['basecmd', 'install', 'x']]
- expected = [['basecmd', 'ls'], ['basecmd', 'list'],
- ['basecmd', 'basecmda', '/blah'],
- ['basecmd', 'install', 'x']]
- fixed_commands = subp.prepend_base_command(
- base_command='basecmd', commands=orig_commands)
- self.assertEqual('', self.logs.getvalue())
- self.assertEqual(expected, fixed_commands)
-
- def test_prepend_base_command_removes_first_item_when_none(self):
- """Remove the first element of a non-basecmd when it is None."""
- orig_commands = [[None, 'ls'], ['basecmd', 'list'],
- [None, 'touch', '/blah'],
- ['basecmd', 'install', 'x']]
- expected = [['ls'], ['basecmd', 'list'],
- ['touch', '/blah'],
- ['basecmd', 'install', 'x']]
- fixed_commands = subp.prepend_base_command(
- base_command='basecmd', commands=orig_commands)
- self.assertEqual('', self.logs.getvalue())
- self.assertEqual(expected, fixed_commands)
-
-
-class TestSubp(CiTestCase):
- allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
- BOGUS_COMMAND, sys.executable]
-
- stdin2err = [BASH, '-c', 'cat >&2']
- stdin2out = ['cat']
- utf8_invalid = b'ab\xaadef'
- utf8_valid = b'start \xc3\xa9 end'
- utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
- printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
-
- def printf_cmd(self, *args):
- # bash's printf supports \xaa. So does /usr/bin/printf
- # but by using bash, we remove dependency on another program.
- return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
-
- def test_subp_handles_bytestrings(self):
- """subp can run a bytestring command if shell is True."""
- tmp_file = self.tmp_path('test.out')
- cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
- (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
- self.assertEqual(u'', out)
- self.assertEqual(u'', _err)
- self.assertEqual('HI MOM\n', util.load_file(tmp_file))
-
- def test_subp_handles_strings(self):
- """subp can run a string command if shell is True."""
- tmp_file = self.tmp_path('test.out')
- cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
- (out, _err) = subp.subp(cmd, shell=True)
- self.assertEqual(u'', out)
- self.assertEqual(u'', _err)
- self.assertEqual('HI MOM\n', util.load_file(tmp_file))
-
- def test_subp_handles_utf8(self):
- # The given bytes contain utf-8 accented characters as seen in e.g.
- # the "deja dup" package in Ubuntu.
- cmd = self.printf_cmd(self.utf8_valid_2)
- (out, _err) = subp.subp(cmd, capture=True)
- self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
-
- def test_subp_respects_decode_false(self):
- (out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
- data=self.utf8_valid)
- self.assertTrue(isinstance(out, bytes))
- self.assertTrue(isinstance(err, bytes))
- self.assertEqual(out, self.utf8_valid)
-
- def test_subp_decode_ignore(self):
- # this executes a string that writes invalid utf-8 to stdout
- (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
- capture=True, decode='ignore')
- self.assertEqual(out, 'abcdef')
-
- def test_subp_decode_strict_valid_utf8(self):
- (out, _err) = subp.subp(self.stdin2out, capture=True,
- decode='strict', data=self.utf8_valid)
- self.assertEqual(out, self.utf8_valid.decode('utf-8'))
-
- def test_subp_decode_invalid_utf8_replaces(self):
- (out, _err) = subp.subp(self.stdin2out, capture=True,
- data=self.utf8_invalid)
- expected = self.utf8_invalid.decode('utf-8', 'replace')
- self.assertEqual(out, expected)
-
- def test_subp_decode_strict_raises(self):
- args = []
- kwargs = {'args': self.stdin2out, 'capture': True,
- 'decode': 'strict', 'data': self.utf8_invalid}
- self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)
-
- def test_subp_capture_stderr(self):
- data = b'hello world'
- (out, err) = subp.subp(self.stdin2err, capture=True,
- decode=False, data=data,
- update_env={'LC_ALL': 'C'})
- self.assertEqual(err, data)
- self.assertEqual(out, b'')
-
- def test_subp_reads_env(self):
- with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
- out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
- self.assertEqual('FOO=BAR', out.splitlines()[0])
-
- def test_subp_env_and_update_env(self):
- out, _err = subp.subp(
- self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
- env={'FOO': 'BAR'},
- update_env={'HOME': '/myhome', 'K2': 'V2'})
- self.assertEqual(
- ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
-
- def test_subp_update_env(self):
- extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
- with mock.patch.dict("os.environ", values=extra):
- out, _err = subp.subp(
- self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
- update_env={'HOME': '/myhome', 'K2': 'V2'})
-
- self.assertEqual(
- ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
-
- def test_subp_warn_missing_shebang(self):
- """Warn on no #! in script"""
- noshebang = self.tmp_path('noshebang')
- util.write_file(noshebang, 'true\n')
-
- print("os is %s" % os)
- os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- with self.allow_subp([noshebang]):
- self.assertRaisesRegex(subp.ProcessExecutionError,
- r'Missing #! in script\?',
- subp.subp, (noshebang,))
-
- def test_subp_combined_stderr_stdout(self):
- """Providing combine_capture as True redirects stderr to stdout."""
- data = b'hello world'
- (out, err) = subp.subp(self.stdin2err, capture=True,
- combine_capture=True, decode=False, data=data)
- self.assertEqual(b'', err)
- self.assertEqual(data, out)
-
- def test_returns_none_if_no_capture(self):
- (out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
- self.assertIsNone(err)
- self.assertIsNone(out)
-
- def test_exception_has_out_err_are_bytes_if_decode_false(self):
- """Raised exc should have stderr, stdout as bytes if no decode."""
- with self.assertRaises(subp.ProcessExecutionError) as cm:
- subp.subp([BOGUS_COMMAND], decode=False)
- self.assertTrue(isinstance(cm.exception.stdout, bytes))
- self.assertTrue(isinstance(cm.exception.stderr, bytes))
-
- def test_exception_has_out_err_are_bytes_if_decode_true(self):
- """Raised exc should have stderr, stdout as string if no decode."""
- with self.assertRaises(subp.ProcessExecutionError) as cm:
- subp.subp([BOGUS_COMMAND], decode=True)
- self.assertTrue(isinstance(cm.exception.stdout, str))
- self.assertTrue(isinstance(cm.exception.stderr, str))
-
- def test_bunch_of_slashes_in_path(self):
- self.assertEqual("/target/my/path/",
- subp.target_path("/target/", "//my/path/"))
- self.assertEqual("/target/my/path/",
- subp.target_path("/target/", "///my/path/"))
-
- def test_c_lang_can_take_utf8_args(self):
- """Independent of system LC_CTYPE, args can contain utf-8 strings.
-
- When python starts up, its default encoding gets set based on
- the value of LC_CTYPE. If no system locale is set, the default
- encoding for both python2 and python3 in some paths will end up
- being ascii.
-
- Attempts to use setlocale or patching (or changing) os.environ
- in the current environment seem to not be effective.
-
- This test starts up a python with LC_CTYPE set to C so that
- the default encoding will be set to ascii. In such an environment
- Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
- """
- python_prog = '\n'.join([
- 'import json, sys',
- 'from cloudinit.subp import subp',
- 'data = sys.stdin.read()',
- 'cmd = json.loads(data)',
- 'subp(cmd, capture=False)',
- ''])
- cmd = [BASH, '-c', 'echo -n "$@"', '--',
- self.utf8_valid.decode("utf-8")]
- python_subp = [sys.executable, '-c', python_prog]
-
- out, _err = subp.subp(
- python_subp, update_env={'LC_CTYPE': 'C'},
- data=json.dumps(cmd).encode("utf-8"),
- decode=False)
- self.assertEqual(self.utf8_valid, out)
-
- def test_bogus_command_logs_status_messages(self):
- """status_cb gets status messages logs on bogus commands provided."""
- logs = []
-
- def status_cb(log):
- logs.append(log)
-
- with self.assertRaises(subp.ProcessExecutionError):
- subp.subp([BOGUS_COMMAND], status_cb=status_cb)
-
- expected = [
- 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
- 'ERROR: End run command: invalid command provided\n']
- self.assertEqual(expected, logs)
-
- def test_command_logs_exit_codes_to_status_cb(self):
- """status_cb gets status messages containing command exit code."""
- logs = []
-
- def status_cb(log):
- logs.append(log)
-
- with self.assertRaises(subp.ProcessExecutionError):
- subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
- subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
-
- expected = [
- 'Begin run command: %s -c exit 2\n' % BASH,
- 'ERROR: End run command: exit(2)\n',
- 'Begin run command: %s -c exit 0\n' % BASH,
- 'End run command: exit(0)\n']
- self.assertEqual(expected, logs)
-
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_temp_utils.py b/cloudinit/tests/test_temp_utils.py
deleted file mode 100644
index 4a52ef89..00000000
--- a/cloudinit/tests/test_temp_utils.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloudinit.temp_utils"""
-
-from cloudinit.temp_utils import mkdtemp, mkstemp, tempdir
-from cloudinit.tests.helpers import CiTestCase, wrap_and_call
-import os
-
-
-class TestTempUtils(CiTestCase):
-
- def test_mkdtemp_default_non_root(self):
- """mkdtemp creates a dir under /tmp for the unprivileged."""
- calls = []
-
- def fake_mkdtemp(*args, **kwargs):
- calls.append(kwargs)
- return '/fake/return/path'
-
- retval = wrap_and_call(
- 'cloudinit.temp_utils',
- {'os.getuid': 1000,
- 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
- '_TMPDIR': {'new': None},
- 'os.path.isdir': True},
- mkdtemp)
- self.assertEqual('/fake/return/path', retval)
- self.assertEqual([{'dir': '/tmp'}], calls)
-
- def test_mkdtemp_default_non_root_needs_exe(self):
- """mkdtemp creates a dir under /var/tmp/cloud-init when needs_exe."""
- calls = []
-
- def fake_mkdtemp(*args, **kwargs):
- calls.append(kwargs)
- return '/fake/return/path'
-
- retval = wrap_and_call(
- 'cloudinit.temp_utils',
- {'os.getuid': 1000,
- 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
- '_TMPDIR': {'new': None},
- 'os.path.isdir': True},
- mkdtemp, needs_exe=True)
- self.assertEqual('/fake/return/path', retval)
- self.assertEqual([{'dir': '/var/tmp/cloud-init'}], calls)
-
- def test_mkdtemp_default_root(self):
- """mkdtemp creates a dir under /run/cloud-init for the privileged."""
- calls = []
-
- def fake_mkdtemp(*args, **kwargs):
- calls.append(kwargs)
- return '/fake/return/path'
-
- retval = wrap_and_call(
- 'cloudinit.temp_utils',
- {'os.getuid': 0,
- 'tempfile.mkdtemp': {'side_effect': fake_mkdtemp},
- '_TMPDIR': {'new': None},
- 'os.path.isdir': True},
- mkdtemp)
- self.assertEqual('/fake/return/path', retval)
- self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
-
- def test_mkstemp_default_non_root(self):
- """mkstemp creates secure tempfile under /tmp for the unprivileged."""
- calls = []
-
- def fake_mkstemp(*args, **kwargs):
- calls.append(kwargs)
- return '/fake/return/path'
-
- retval = wrap_and_call(
- 'cloudinit.temp_utils',
- {'os.getuid': 1000,
- 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
- '_TMPDIR': {'new': None},
- 'os.path.isdir': True},
- mkstemp)
- self.assertEqual('/fake/return/path', retval)
- self.assertEqual([{'dir': '/tmp'}], calls)
-
- def test_mkstemp_default_root(self):
- """mkstemp creates a secure tempfile in /run/cloud-init for root."""
- calls = []
-
- def fake_mkstemp(*args, **kwargs):
- calls.append(kwargs)
- return '/fake/return/path'
-
- retval = wrap_and_call(
- 'cloudinit.temp_utils',
- {'os.getuid': 0,
- 'tempfile.mkstemp': {'side_effect': fake_mkstemp},
- '_TMPDIR': {'new': None},
- 'os.path.isdir': True},
- mkstemp)
- self.assertEqual('/fake/return/path', retval)
- self.assertEqual([{'dir': '/run/cloud-init/tmp'}], calls)
-
- def test_tempdir_error_suppression(self):
- """test tempdir suppresses errors during directory removal."""
-
- with self.assertRaises(OSError):
- with tempdir(prefix='cloud-init-dhcp-') as tdir:
- os.rmdir(tdir)
- # As a result, the directory is already gone,
- # so shutil.rmtree should raise OSError
-
- with tempdir(rmtree_ignore_errors=True,
- prefix='cloud-init-dhcp-') as tdir:
- os.rmdir(tdir)
- # Since the directory is already gone, shutil.rmtree would raise
- # OSError, but we suppress that
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_upgrade.py b/cloudinit/tests/test_upgrade.py
deleted file mode 100644
index f79a2536..00000000
--- a/cloudinit/tests/test_upgrade.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# Copyright (C) 2020 Canonical Ltd.
-#
-# Author: Daniel Watkins <oddbloke@ubuntu.com>
-#
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Upgrade testing for cloud-init.
-
-This module tests cloud-init's behaviour across upgrades. Specifically, it
-specifies a set of invariants that the current codebase expects to be true (as
-tests in ``TestUpgrade``) and then checks that these hold true after unpickling
-``obj.pkl``s from previous versions of cloud-init; those pickles are stored in
-``tests/data/old_pickles/``.
-"""
-
-import operator
-import pathlib
-
-import pytest
-
-from cloudinit.stages import _pkl_load
-from cloudinit.tests.helpers import resourceLocation
-
-
-class TestUpgrade:
- @pytest.fixture(
- params=pathlib.Path(resourceLocation("old_pickles")).glob("*.pkl"),
- scope="class",
- ids=operator.attrgetter("name"),
- )
- def previous_obj_pkl(self, request):
- """Load each pickle to memory once, then run all tests against it.
-
- Test implementations _must not_ modify the ``previous_obj_pkl`` which
- they are passed, as that will affect tests that run after them.
- """
- return _pkl_load(str(request.param))
-
- def test_networking_set_on_distro(self, previous_obj_pkl):
- """We always expect to have ``.networking`` on ``Distro`` objects."""
- assert previous_obj_pkl.distro.networking is not None
-
- def test_blacklist_drivers_set_on_networking(self, previous_obj_pkl):
- """We always expect Networking.blacklist_drivers to be initialised."""
- assert previous_obj_pkl.distro.networking.blacklist_drivers is None
diff --git a/cloudinit/tests/test_url_helper.py b/cloudinit/tests/test_url_helper.py
deleted file mode 100644
index 364ec822..00000000
--- a/cloudinit/tests/test_url_helper.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.url_helper import (
- NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url,
- retry_on_url_exc)
-from cloudinit.tests.helpers import CiTestCase, mock, skipIf
-from cloudinit import util
-from cloudinit import version
-
-import httpretty
-import requests
-
-
-try:
- import oauthlib
- assert oauthlib # avoid pyflakes error F401: import unused
- _missing_oauthlib_dep = False
-except ImportError:
- _missing_oauthlib_dep = True
-
-
-M_PATH = 'cloudinit.url_helper.'
-
-
-class TestOAuthHeaders(CiTestCase):
-
- def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self):
- """oauth_headers raises a NotImplemented error when oauth absent."""
- with mock.patch.dict('sys.modules', {'oauthlib': None}):
- with self.assertRaises(NotImplementedError) as context_manager:
- oauth_headers(1, 2, 3, 4, 5)
- self.assertEqual(
- 'oauth support is not available',
- str(context_manager.exception))
-
- @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency")
- @mock.patch('oauthlib.oauth1.Client')
- def test_oauth_headers_calls_oathlibclient_when_available(self, m_client):
- """oauth_headers calls oaut1.hClient.sign with the provided url."""
- class fakeclient(object):
- def sign(self, url):
- # The first and 3rd item of the client.sign tuple are ignored
- return ('junk', url, 'junk2')
-
- m_client.return_value = fakeclient()
-
- return_value = oauth_headers(
- 'url', 'consumer_key', 'token_key', 'token_secret',
- 'consumer_secret')
- self.assertEqual('url', return_value)
-
-
-class TestReadFileOrUrl(CiTestCase):
-
- with_logs = True
-
- def test_read_file_or_url_str_from_file(self):
- """Test that str(result.contents) on file is text version of contents.
- It should not be "b'data'", but just "'data'" """
- tmpf = self.tmp_path("myfile1")
- data = b'This is my file content\n'
- util.write_file(tmpf, data, omode="wb")
- result = read_file_or_url("file://%s" % tmpf)
- self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url(self):
- """Test that str(result.contents) on url is text version of contents.
- It should not be "b'data'", but just "'data'" """
- url = 'http://hostname/path'
- data = b'This is my url content\n'
- httpretty.register_uri(httpretty.GET, url, data)
- result = read_file_or_url(url)
- self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self):
- """Headers are redacted from logs but unredacted in requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
- httpretty.register_uri(httpretty.GET, url)
-
- read_file_or_url(url, headers=headers, headers_redact=['sensitive'])
- logs = self.logs.getvalue()
- for k in headers.keys():
- self.assertEqual(headers[k], httpretty.last_request().headers[k])
- self.assertIn(REDACTED, logs)
- self.assertNotIn('sekret', logs)
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url_redacts_noheaders(self):
- """When no headers_redact, header values are in logs and requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
- httpretty.register_uri(httpretty.GET, url)
-
- read_file_or_url(url, headers=headers)
- for k in headers.keys():
- self.assertEqual(headers[k], httpretty.last_request().headers[k])
- logs = self.logs.getvalue()
- self.assertNotIn(REDACTED, logs)
- self.assertIn('sekret', logs)
-
- @mock.patch(M_PATH + 'readurl')
- def test_read_file_or_url_passes_params_to_readurl(self, m_readurl):
- """read_file_or_url passes all params through to readurl."""
- url = 'http://hostname/path'
- response = 'This is my url content\n'
- m_readurl.return_value = response
- params = {'url': url, 'timeout': 1, 'retries': 2,
- 'headers': {'somehdr': 'val'},
- 'data': 'data', 'sec_between': 1,
- 'ssl_details': {'cert_file': '/path/cert.pem'},
- 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'}
- self.assertEqual(response, read_file_or_url(**params))
- params.pop('url') # url is passed in as a positional arg
- self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list)
-
- def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self):
- """Readurl param defaults used when unspecified by read_file_or_url
-
- Param defaults tested are as follows:
- retries: 0, additional headers None beyond default, method: GET,
- data: None, check_status: True and allow_redirects: True
- """
- url = 'http://hostname/path'
-
- m_response = mock.MagicMock()
-
- class FakeSession(requests.Session):
- @classmethod
- def request(cls, **kwargs):
- self.assertEqual(
- {'url': url, 'allow_redirects': True, 'method': 'GET',
- 'headers': {
- 'User-Agent': 'Cloud-Init/%s' % (
- version.version_string())}},
- kwargs)
- return m_response
-
- with mock.patch(M_PATH + 'requests.Session') as m_session:
- error = requests.exceptions.HTTPError('broke')
- m_session.side_effect = [error, FakeSession()]
- # assert no retries and check_status == True
- with self.assertRaises(UrlError) as context_manager:
- response = read_file_or_url(url)
- self.assertEqual('broke', str(context_manager.exception))
- # assert default headers, method, url and allow_redirects True
- # Success on 2nd call with FakeSession
- response = read_file_or_url(url)
- self.assertEqual(m_response, response._response)
-
-
-class TestRetryOnUrlExc(CiTestCase):
-
- def test_do_not_retry_non_urlerror(self):
- """When exception is not UrlError return False."""
- myerror = IOError('something unexcpected')
- self.assertFalse(retry_on_url_exc(msg='', exc=myerror))
-
- def test_perform_retries_on_not_found(self):
- """When exception is UrlError with a 404 status code return True."""
- myerror = UrlError(cause=RuntimeError(
- 'something was not found'), code=NOT_FOUND)
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
-
- def test_perform_retries_on_timeout(self):
- """When exception is a requests.Timout return True."""
- myerror = UrlError(cause=requests.Timeout('something timed out'))
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_util.py b/cloudinit/tests/test_util.py
deleted file mode 100644
index b7a302f1..00000000
--- a/cloudinit/tests/test_util.py
+++ /dev/null
@@ -1,854 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloudinit.util"""
-
-import base64
-import logging
-import json
-import platform
-import pytest
-
-import cloudinit.util as util
-from cloudinit import subp
-
-from cloudinit.tests.helpers import CiTestCase, mock
-from textwrap import dedent
-
-LOG = logging.getLogger(__name__)
-
-MOUNT_INFO = [
- '68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64',
- '153 68 254:0 / /home rw,relatime shared:101 - xfs /dev/sda2 rw,attr2'
-]
-
-OS_RELEASE_SLES = dedent("""\
- NAME="SLES"
- VERSION="12-SP3"
- VERSION_ID="12.3"
- PRETTY_NAME="SUSE Linux Enterprise Server 12 SP3"
- ID="sles"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:suse:sles:12:sp3"
-""")
-
-OS_RELEASE_OPENSUSE = dedent("""\
- NAME="openSUSE Leap"
- VERSION="42.3"
- ID=opensuse
- ID_LIKE="suse"
- VERSION_ID="42.3"
- PRETTY_NAME="openSUSE Leap 42.3"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:leap:42.3"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_OPENSUSE_L15 = dedent("""\
- NAME="openSUSE Leap"
- VERSION="15.0"
- ID="opensuse-leap"
- ID_LIKE="suse opensuse"
- VERSION_ID="15.0"
- PRETTY_NAME="openSUSE Leap 15.0"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:leap:15.0"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_OPENSUSE_TW = dedent("""\
- NAME="openSUSE Tumbleweed"
- ID="opensuse-tumbleweed"
- ID_LIKE="opensuse suse"
- VERSION_ID="20180920"
- PRETTY_NAME="openSUSE Tumbleweed"
- ANSI_COLOR="0;32"
- CPE_NAME="cpe:/o:opensuse:tumbleweed:20180920"
- BUG_REPORT_URL="https://bugs.opensuse.org"
- HOME_URL="https://www.opensuse.org/"
-""")
-
-OS_RELEASE_CENTOS = dedent("""\
- NAME="CentOS Linux"
- VERSION="7 (Core)"
- ID="centos"
- ID_LIKE="rhel fedora"
- VERSION_ID="7"
- PRETTY_NAME="CentOS Linux 7 (Core)"
- ANSI_COLOR="0;31"
- CPE_NAME="cpe:/o:centos:centos:7"
- HOME_URL="https://www.centos.org/"
- BUG_REPORT_URL="https://bugs.centos.org/"
-
- CENTOS_MANTISBT_PROJECT="CentOS-7"
- CENTOS_MANTISBT_PROJECT_VERSION="7"
- REDHAT_SUPPORT_PRODUCT="centos"
- REDHAT_SUPPORT_PRODUCT_VERSION="7"
-""")
-
-OS_RELEASE_REDHAT_7 = dedent("""\
- NAME="Red Hat Enterprise Linux Server"
- VERSION="7.5 (Maipo)"
- ID="rhel"
- ID_LIKE="fedora"
- VARIANT="Server"
- VARIANT_ID="server"
- VERSION_ID="7.5"
- PRETTY_NAME="Red Hat"
- ANSI_COLOR="0;31"
- CPE_NAME="cpe:/o:redhat:enterprise_linux:7.5:GA:server"
- HOME_URL="https://www.redhat.com/"
- BUG_REPORT_URL="https://bugzilla.redhat.com/"
-
- REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 7"
- REDHAT_BUGZILLA_PRODUCT_VERSION=7.5
- REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux"
- REDHAT_SUPPORT_PRODUCT_VERSION="7.5"
-""")
-
-REDHAT_RELEASE_CENTOS_6 = "CentOS release 6.10 (Final)"
-REDHAT_RELEASE_CENTOS_7 = "CentOS Linux release 7.5.1804 (Core)"
-REDHAT_RELEASE_REDHAT_6 = (
- "Red Hat Enterprise Linux Server release 6.10 (Santiago)")
-REDHAT_RELEASE_REDHAT_7 = (
- "Red Hat Enterprise Linux Server release 7.5 (Maipo)")
-
-
-OS_RELEASE_DEBIAN = dedent("""\
- PRETTY_NAME="Debian GNU/Linux 9 (stretch)"
- NAME="Debian GNU/Linux"
- VERSION_ID="9"
- VERSION="9 (stretch)"
- ID=debian
- HOME_URL="https://www.debian.org/"
- SUPPORT_URL="https://www.debian.org/support"
- BUG_REPORT_URL="https://bugs.debian.org/"
-""")
-
-OS_RELEASE_UBUNTU = dedent("""\
- NAME="Ubuntu"\n
- # comment test
- VERSION="16.04.3 LTS (Xenial Xerus)"\n
- ID=ubuntu\n
- ID_LIKE=debian\n
- PRETTY_NAME="Ubuntu 16.04.3 LTS"\n
- VERSION_ID="16.04"\n
- HOME_URL="http://www.ubuntu.com/"\n
- SUPPORT_URL="http://help.ubuntu.com/"\n
- BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"\n
- VERSION_CODENAME=xenial\n
- UBUNTU_CODENAME=xenial\n
-""")
-
-
-class FakeCloud(object):
-
- def __init__(self, hostname, fqdn):
- self.hostname = hostname
- self.fqdn = fqdn
- self.calls = []
-
- def get_hostname(self, fqdn=None, metadata_only=None):
- myargs = {}
- if fqdn is not None:
- myargs['fqdn'] = fqdn
- if metadata_only is not None:
- myargs['metadata_only'] = metadata_only
- self.calls.append(myargs)
- if fqdn:
- return self.fqdn
- return self.hostname
-
-
-class TestUtil(CiTestCase):
-
- def test_parse_mount_info_no_opts_no_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
-
- def test_parse_mount_info_no_opts_arg(self):
- result = util.parse_mount_info('/home', MOUNT_INFO, LOG, False)
- self.assertEqual(('/dev/sda2', 'xfs', '/home'), result)
-
- def test_parse_mount_info_with_opts(self):
- result = util.parse_mount_info('/', MOUNT_INFO, LOG, True)
- self.assertEqual(
- ('/dev/sda1', 'btrfs', '/', 'ro,relatime'),
- result
- )
-
- @mock.patch('cloudinit.util.get_mount_info')
- def test_mount_is_rw(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'rw,relatime')
- is_rw = util.mount_is_read_write('/')
- self.assertEqual(is_rw, True)
-
- @mock.patch('cloudinit.util.get_mount_info')
- def test_mount_is_ro(self, m_mount_info):
- m_mount_info.return_value = ('/dev/sda1', 'btrfs', '/', 'ro,relatime')
- is_rw = util.mount_is_read_write('/')
- self.assertEqual(is_rw, False)
-
-
-class TestUptime(CiTestCase):
-
- @mock.patch('cloudinit.util.boottime')
- @mock.patch('cloudinit.util.os.path.exists')
- @mock.patch('cloudinit.util.time.time')
- def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime):
- boottime = 1000.0
- uptime = 10.0
- m_boottime.return_value = boottime
- m_time.return_value = boottime + uptime
- m_exists.return_value = False
- result = util.uptime()
- self.assertEqual(str(uptime), result)
-
-
-class TestShellify(CiTestCase):
-
- def test_input_dict_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'Input.*was.*dict.*xpected',
- util.shellify, {'mykey': 'myval'})
-
- def test_input_str_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'Input.*was.*str.*xpected', util.shellify, "foobar")
-
- def test_value_with_int_raises_type_error(self):
- self.assertRaisesRegex(
- TypeError, 'shellify.*int', util.shellify, ["foo", 1])
-
- def test_supports_strings_and_lists(self):
- self.assertEqual(
- '\n'.join(["#!/bin/sh", "echo hi mom", "'echo' 'hi dad'",
- "'echo' 'hi' 'sis'", ""]),
- util.shellify(["echo hi mom", ["echo", "hi dad"],
- ('echo', 'hi', 'sis')]))
-
-
-class TestGetHostnameFqdn(CiTestCase):
-
- def test_get_hostname_fqdn_from_only_cfg_fqdn(self):
- """When cfg only has the fqdn key, derive hostname and fqdn from it."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com'}, cloud=None)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_fqdn_and_hostname(self):
- """When cfg has both fqdn and hostname keys, return them."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'fqdn': 'myhost.domain.com', 'hostname': 'other'}, cloud=None)
- self.assertEqual('other', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_hostname_with_domain(self):
- """When cfg has only hostname key which represents a fqdn, use that."""
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost.domain.com'}, cloud=None)
- self.assertEqual('myhost', hostname)
- self.assertEqual('myhost.domain.com', fqdn)
-
- def test_get_hostname_fqdn_from_cfg_hostname_without_domain(self):
- """When cfg has a hostname without a '.' query cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- hostname, fqdn = util.get_hostname_fqdn(
- cfg={'hostname': 'myhost'}, cloud=mycloud)
- self.assertEqual('myhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': False}], mycloud.calls)
-
- def test_get_hostname_fqdn_from_without_fqdn_or_hostname(self):
- """When cfg has neither hostname nor fqdn cloud.get_hostname."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- hostname, fqdn = util.get_hostname_fqdn(cfg={}, cloud=mycloud)
- self.assertEqual('cloudhost', hostname)
- self.assertEqual('cloudhost.mycloud.com', fqdn)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': False},
- {'metadata_only': False}], mycloud.calls)
-
- def test_get_hostname_fqdn_from_passes_metadata_only_to_cloud(self):
- """Calls to cloud.get_hostname pass the metadata_only parameter."""
- mycloud = FakeCloud('cloudhost', 'cloudhost.mycloud.com')
- _hn, _fqdn = util.get_hostname_fqdn(
- cfg={}, cloud=mycloud, metadata_only=True)
- self.assertEqual(
- [{'fqdn': True, 'metadata_only': True},
- {'metadata_only': True}], mycloud.calls)
-
-
-class TestBlkid(CiTestCase):
- ids = {
- "id01": "1111-1111",
- "id02": "22222222-2222",
- "id03": "33333333-3333",
- "id04": "44444444-4444",
- "id05": "55555555-5555-5555-5555-555555555555",
- "id06": "66666666-6666-6666-6666-666666666666",
- "id07": "52894610484658920398",
- "id08": "86753098675309867530",
- "id09": "99999999-9999-9999-9999-999999999999",
- }
-
- blkid_out = dedent("""\
- /dev/loop0: TYPE="squashfs"
- /dev/loop1: TYPE="squashfs"
- /dev/loop2: TYPE="squashfs"
- /dev/loop3: TYPE="squashfs"
- /dev/sda1: UUID="{id01}" TYPE="vfat" PARTUUID="{id02}"
- /dev/sda2: UUID="{id03}" TYPE="ext4" PARTUUID="{id04}"
- /dev/sda3: UUID="{id05}" TYPE="ext4" PARTUUID="{id06}"
- /dev/sda4: LABEL="default" UUID="{id07}" UUID_SUB="{id08}" """
- """TYPE="zfs_member" PARTUUID="{id09}"
- /dev/loop4: TYPE="squashfs"
- """)
-
- maxDiff = None
-
- def _get_expected(self):
- return ({
- "/dev/loop0": {"DEVNAME": "/dev/loop0", "TYPE": "squashfs"},
- "/dev/loop1": {"DEVNAME": "/dev/loop1", "TYPE": "squashfs"},
- "/dev/loop2": {"DEVNAME": "/dev/loop2", "TYPE": "squashfs"},
- "/dev/loop3": {"DEVNAME": "/dev/loop3", "TYPE": "squashfs"},
- "/dev/loop4": {"DEVNAME": "/dev/loop4", "TYPE": "squashfs"},
- "/dev/sda1": {"DEVNAME": "/dev/sda1", "TYPE": "vfat",
- "UUID": self.ids["id01"],
- "PARTUUID": self.ids["id02"]},
- "/dev/sda2": {"DEVNAME": "/dev/sda2", "TYPE": "ext4",
- "UUID": self.ids["id03"],
- "PARTUUID": self.ids["id04"]},
- "/dev/sda3": {"DEVNAME": "/dev/sda3", "TYPE": "ext4",
- "UUID": self.ids["id05"],
- "PARTUUID": self.ids["id06"]},
- "/dev/sda4": {"DEVNAME": "/dev/sda4", "TYPE": "zfs_member",
- "LABEL": "default",
- "UUID": self.ids["id07"],
- "UUID_SUB": self.ids["id08"],
- "PARTUUID": self.ids["id09"]},
- })
-
- @mock.patch("cloudinit.subp.subp")
- def test_functional_blkid(self, m_subp):
- m_subp.return_value = (
- self.blkid_out.format(**self.ids), "")
- self.assertEqual(self._get_expected(), util.blkid())
- m_subp.assert_called_with(["blkid", "-o", "full"], capture=True,
- decode="replace")
-
- @mock.patch("cloudinit.subp.subp")
- def test_blkid_no_cache_uses_no_cache(self, m_subp):
- """blkid should turn off cache if disable_cache is true."""
- m_subp.return_value = (
- self.blkid_out.format(**self.ids), "")
- self.assertEqual(self._get_expected(),
- util.blkid(disable_cache=True))
- m_subp.assert_called_with(["blkid", "-o", "full", "-c", "/dev/null"],
- capture=True, decode="replace")
-
-
-@mock.patch('cloudinit.subp.subp')
-class TestUdevadmSettle(CiTestCase):
- def test_with_no_params(self, m_subp):
- """called with no parameters."""
- util.udevadm_settle()
- m_subp.called_once_with(mock.call(['udevadm', 'settle']))
-
- def test_with_exists_and_not_exists(self, m_subp):
- """with exists=file where file does not exist should invoke subp."""
- mydev = self.tmp_path("mydev")
- util.udevadm_settle(exists=mydev)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--exit-if-exists=%s' % mydev])
-
- def test_with_exists_and_file_exists(self, m_subp):
- """with exists=file where file does exist should not invoke subp."""
- mydev = self.tmp_path("mydev")
- util.write_file(mydev, "foo\n")
- util.udevadm_settle(exists=mydev)
- self.assertIsNone(m_subp.call_args)
-
- def test_with_timeout_int(self, m_subp):
- """timeout can be an integer."""
- timeout = 9
- util.udevadm_settle(timeout=timeout)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout])
-
- def test_with_timeout_string(self, m_subp):
- """timeout can be a string."""
- timeout = "555"
- util.udevadm_settle(timeout=timeout)
- m_subp.assert_called_once_with(
- ['udevadm', 'settle', '--timeout=%s' % timeout])
-
- def test_with_exists_and_timeout(self, m_subp):
- """test call with both exists and timeout."""
- mydev = self.tmp_path("mydev")
- timeout = "3"
- util.udevadm_settle(exists=mydev)
- m_subp.called_once_with(
- ['udevadm', 'settle', '--exit-if-exists=%s' % mydev,
- '--timeout=%s' % timeout])
-
- def test_subp_exception_raises_to_caller(self, m_subp):
- m_subp.side_effect = subp.ProcessExecutionError("BOOM")
- self.assertRaises(subp.ProcessExecutionError, util.udevadm_settle)
-
-
-@mock.patch('os.path.exists')
-class TestGetLinuxDistro(CiTestCase):
-
- def setUp(self):
- # python2 has no lru_cache, and therefore, no cache_clear()
- if hasattr(util.get_linux_distro, "cache_clear"):
- util.get_linux_distro.cache_clear()
-
- @classmethod
- def os_release_exists(self, path):
- """Side effect function"""
- if path == '/etc/os-release':
- return 1
-
- @classmethod
- def redhat_release_exists(self, path):
- """Side effect function """
- if path == '/etc/redhat-release':
- return 1
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists):
- """Verify we get the correct name if the os-release file has
- the distro name in quotes"""
- m_os_release.return_value = OS_RELEASE_SLES
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('sles', '12.3', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists):
- """Verify we get the correct name if the os-release file does not
- have the distro name in quotes"""
- m_os_release.return_value = OS_RELEASE_UBUNTU
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('ubuntu', '16.04', 'xenial'), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.release')
- @mock.patch('cloudinit.util._parse_redhat_release')
- def test_get_linux_freebsd(self, m_parse_redhat_release,
- m_platform_release,
- m_platform_system, m_path_exists):
- """Verify we get the correct name and release name on FreeBSD."""
- m_path_exists.return_value = False
- m_platform_release.return_value = '12.0-RELEASE-p10'
- m_platform_system.return_value = 'FreeBSD'
- m_parse_redhat_release.return_value = {}
- util.is_BSD.cache_clear()
- dist = util.get_linux_distro()
- self.assertEqual(('freebsd', '12.0-RELEASE-p10', ''), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_centos6(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on CentOS 6."""
- m_os_release.return_value = REDHAT_RELEASE_CENTOS_6
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '6.10', 'Final'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists):
- """Verify the correct release info on CentOS 7 without os-release."""
- m_os_release.return_value = REDHAT_RELEASE_CENTOS_7
- m_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '7.5.1804', 'Core'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists):
- """Verify redhat 7 read from os-release."""
- m_os_release.return_value = OS_RELEASE_REDHAT_7
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists):
- """Verify redhat 7 read from redhat-release."""
- m_os_release.return_value = REDHAT_RELEASE_REDHAT_7
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '7.5', 'Maipo'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists):
- """Verify redhat 6 read from redhat-release."""
- m_os_release.return_value = REDHAT_RELEASE_REDHAT_6
- m_path_exists.side_effect = TestGetLinuxDistro.redhat_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('redhat', '6.10', 'Santiago'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_copr_centos(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on COPR CentOS."""
- m_os_release.return_value = OS_RELEASE_CENTOS
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('centos', '7', 'Core'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_debian(self, m_os_release, m_path_exists):
- """Verify we get the correct name and release name on Debian."""
- m_os_release.return_value = OS_RELEASE_DEBIAN
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('debian', '9', 'stretch'), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- prior to openSUSE Leap 15.
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('opensuse', '42.3', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- for openSUSE Leap 15.0 and later.
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE_L15
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(('opensuse-leap', '15.0', platform.machine()), dist)
-
- @mock.patch('cloudinit.util.load_file')
- def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists):
- """Verify we get the correct name and machine arch on openSUSE
- for openSUSE Tumbleweed
- """
- m_os_release.return_value = OS_RELEASE_OPENSUSE_TW
- m_path_exists.side_effect = TestGetLinuxDistro.os_release_exists
- dist = util.get_linux_distro()
- self.assertEqual(
- ('opensuse-tumbleweed', '20180920', platform.machine()), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_data(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get no information if os-release does not exist"""
- m_platform_dist.return_value = ('', '', '')
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_no_impl(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get an empty tuple when no information exists and
- Exceptions are not propagated"""
- m_platform_dist.side_effect = Exception()
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('', '', ''), dist)
-
- @mock.patch('platform.system')
- @mock.patch('platform.dist', create=True)
- def test_get_linux_distro_plat_data(self, m_platform_dist,
- m_platform_system, m_path_exists):
- """Verify we get the correct platform information"""
- m_platform_dist.return_value = ('foo', '1.1', 'aarch64')
- m_platform_system.return_value = "Linux"
- m_path_exists.return_value = 0
- dist = util.get_linux_distro()
- self.assertEqual(('foo', '1.1', 'aarch64'), dist)
-
-
-class TestJsonDumps(CiTestCase):
- def test_is_str(self):
- """json_dumps should return a string."""
- self.assertTrue(isinstance(util.json_dumps({'abc': '123'}), str))
-
- def test_utf8(self):
- smiley = '\\ud83d\\ude03'
- self.assertEqual(
- {'smiley': smiley},
- json.loads(util.json_dumps({'smiley': smiley})))
-
- def test_non_utf8(self):
- blob = b'\xba\x03Qx-#y\xea'
- self.assertEqual(
- {'blob': 'ci-b64:' + base64.b64encode(blob).decode('utf-8')},
- json.loads(util.json_dumps({'blob': blob})))
-
-
-@mock.patch('os.path.exists')
-class TestIsLXD(CiTestCase):
-
- def test_is_lxd_true_on_sock_device(self, m_exists):
- """When lxd's /dev/lxd/sock exists, is_lxd returns true."""
- m_exists.return_value = True
- self.assertTrue(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
-
- def test_is_lxd_false_when_sock_device_absent(self, m_exists):
- """When lxd's /dev/lxd/sock is absent, is_lxd returns false."""
- m_exists.return_value = False
- self.assertFalse(util.is_lxd())
- m_exists.assert_called_once_with('/dev/lxd/sock')
-
-
-class TestReadCcFromCmdline:
-
- @pytest.mark.parametrize(
- "cmdline,expected_cfg",
- [
- # Return None if cmdline has no cc:<YAML>end_cc content.
- (CiTestCase.random_string(), None),
- # Return None if YAML content is empty string.
- ('foo cc: end_cc bar', None),
- # Return expected dictionary without trailing end_cc marker.
- ('foo cc: ssh_pwauth: true', {'ssh_pwauth': True}),
- # Return expected dictionary w escaped newline and no end_cc.
- ('foo cc: ssh_pwauth: true\\n', {'ssh_pwauth': True}),
- # Return expected dictionary of yaml between cc: and end_cc.
- ('foo cc: ssh_pwauth: true end_cc bar', {'ssh_pwauth': True}),
- # Return dict with list value w escaped newline, no end_cc.
- (
- 'cc: ssh_import_id: [smoser, kirkland]\\n',
- {'ssh_import_id': ['smoser', 'kirkland']}
- ),
- # Parse urlencoded brackets in yaml content.
- (
- 'cc: ssh_import_id: %5Bsmoser, kirkland%5D end_cc',
- {'ssh_import_id': ['smoser', 'kirkland']}
- ),
- # Parse complete urlencoded yaml content.
- (
- 'cc: ssh_import_id%3A%20%5Buser1%2C%20user2%5D end_cc',
- {'ssh_import_id': ['user1', 'user2']}
- ),
- # Parse nested dictionary in yaml content.
- (
- 'cc: ntp: {enabled: true, ntp_client: myclient} end_cc',
- {'ntp': {'enabled': True, 'ntp_client': 'myclient'}}
- ),
- # Parse single mapping value in yaml content.
- ('cc: ssh_import_id: smoser end_cc', {'ssh_import_id': 'smoser'}),
- # Parse multiline content with multiple mapping and nested lists.
- (
- ('cc: ssh_import_id: [smoser, bob]\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # Parse multiline encoded content w/ mappings and nested lists.
- (
- ('cc: ssh_import_id: %5Bsmoser, bob%5D\\n'
- 'runcmd: [ [ ls, -l ], echo hi ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # test encoded escaped newlines work.
- #
- # unquote(encoded_content)
- # 'ssh_import_id: [smoser, bob]\\nruncmd: [ [ ls, -l ], echo hi ]'
- (
- ('cc: ' +
- ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%5Cn'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D') + ' end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # test encoded newlines work.
- #
- # unquote(encoded_content)
- # 'ssh_import_id: [smoser, bob]\nruncmd: [ [ ls, -l ], echo hi ]'
- (
- ("cc: " +
- ('ssh_import_id%3A%20%5Bsmoser%2C%20bob%5D%0A'
- 'runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%2C'
- '%20echo%20hi%20%5D') + ' end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l'], 'echo hi']}
- ),
- # Parse and merge multiple yaml content sections.
- (
- ('cc:ssh_import_id: [smoser, bob] end_cc '
- 'cc: runcmd: [ [ ls, -l ] ] end_cc'),
- {'ssh_import_id': ['smoser', 'bob'],
- 'runcmd': [['ls', '-l']]}
- ),
- # Parse and merge multiple encoded yaml content sections.
- (
- ('cc:ssh_import_id%3A%20%5Bsmoser%5D end_cc '
- 'cc:runcmd%3A%20%5B%20%5B%20ls%2C%20-l%20%5D%20%5D end_cc'),
- {'ssh_import_id': ['smoser'], 'runcmd': [['ls', '-l']]}
- ),
- ]
- )
- def test_read_conf_from_cmdline_config(self, expected_cfg, cmdline):
- assert expected_cfg == util.read_conf_from_cmdline(cmdline=cmdline)
-
-
-class TestMountCb:
- """Tests for ``util.mount_cb``.
-
- These tests consider the "unit" under test to be ``util.mount_cb`` and
- ``util.unmounter``, which is only used by ``mount_cb``.
-
- TODO: Test default mtype determination
- TODO: Test the if/else branch that actually performs the mounting operation
- """
-
- @pytest.yield_fixture
- def already_mounted_device_and_mountdict(self):
- """Mock an already-mounted device, and yield (device, mount dict)"""
- device = "/dev/fake0"
- mountpoint = "/mnt/fake"
- with mock.patch("cloudinit.util.subp.subp"):
- with mock.patch("cloudinit.util.mounts") as m_mounts:
- mounts = {device: {"mountpoint": mountpoint}}
- m_mounts.return_value = mounts
- yield device, mounts[device]
-
- @pytest.fixture
- def already_mounted_device(self, already_mounted_device_and_mountdict):
- """already_mounted_device_and_mountdict, but return only the device"""
- return already_mounted_device_and_mountdict[0]
-
- @pytest.mark.parametrize(
- "mtype,expected",
- [
- # While the filesystem is called iso9660, the mount type is cd9660
- ("iso9660", "cd9660"),
- # vfat is generally called "msdos" on BSD
- ("vfat", "msdos"),
- # judging from man pages, only FreeBSD has this alias
- ("msdosfs", "msdos"),
- # Test happy path
- ("ufs", "ufs")
- ],
- )
- @mock.patch("cloudinit.util.is_Linux", autospec=True)
- @mock.patch("cloudinit.util.is_BSD", autospec=True)
- @mock.patch("cloudinit.util.subp.subp")
- @mock.patch("cloudinit.temp_utils.tempdir", autospec=True)
- def test_normalize_mtype_on_bsd(
- self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected
- ):
- m_is_BSD.return_value = True
- m_is_Linux.return_value = False
- m_tmpdir.return_value.__enter__ = mock.Mock(
- autospec=True, return_value="/tmp/fake"
- )
- m_tmpdir.return_value.__exit__ = mock.Mock(
- autospec=True, return_value=True
- )
- callback = mock.Mock(autospec=True)
-
- util.mount_cb('/dev/fake0', callback, mtype=mtype)
- assert mock.call(
- ["mount", "-o", "ro", "-t", expected, "/dev/fake0", "/tmp/fake"],
- update_env=None) in m_subp.call_args_list
-
- @pytest.mark.parametrize("invalid_mtype", [int(0), float(0.0), dict()])
- def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype):
- with pytest.raises(TypeError):
- util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype)
-
- @mock.patch("cloudinit.util.subp.subp")
- def test_already_mounted_does_not_mount_or_umount_anything(
- self, m_subp, already_mounted_device
- ):
- util.mount_cb(already_mounted_device, mock.Mock())
-
- assert 0 == m_subp.call_count
-
- @pytest.mark.parametrize("trailing_slash_in_mounts", ["/", ""])
- def test_already_mounted_calls_callback(
- self, trailing_slash_in_mounts, already_mounted_device_and_mountdict
- ):
- device, mount_dict = already_mounted_device_and_mountdict
- mountpoint = mount_dict["mountpoint"]
- mount_dict["mountpoint"] += trailing_slash_in_mounts
-
- callback = mock.Mock()
- util.mount_cb(device, callback)
-
- # The mountpoint passed to callback should always have a trailing
- # slash, regardless of the input
- assert [mock.call(mountpoint + "/")] == callback.call_args_list
-
- def test_already_mounted_calls_callback_with_data(
- self, already_mounted_device
- ):
- callback = mock.Mock()
- util.mount_cb(
- already_mounted_device, callback, data=mock.sentinel.data
- )
-
- assert [
- mock.call(mock.ANY, mock.sentinel.data)
- ] == callback.call_args_list
-
-
-@mock.patch("cloudinit.util.write_file")
-class TestEnsureFile:
- """Tests for ``cloudinit.util.ensure_file``."""
-
- def test_parameters_passed_through(self, m_write_file):
- """Test the parameters in the signature are passed to write_file."""
- util.ensure_file(
- mock.sentinel.path,
- mode=mock.sentinel.mode,
- preserve_mode=mock.sentinel.preserve_mode,
- )
-
- assert 1 == m_write_file.call_count
- args, kwargs = m_write_file.call_args
- assert (mock.sentinel.path,) == args
- assert mock.sentinel.mode == kwargs["mode"]
- assert mock.sentinel.preserve_mode == kwargs["preserve_mode"]
-
- @pytest.mark.parametrize(
- "kwarg,expected",
- [
- # Files should be world-readable by default
- ("mode", 0o644),
- # The previous behaviour of not preserving mode should be retained
- ("preserve_mode", False),
- ],
- )
- def test_defaults(self, m_write_file, kwarg, expected):
- """Test that ensure_file defaults appropriately."""
- util.ensure_file(mock.sentinel.path)
-
- assert 1 == m_write_file.call_count
- _args, kwargs = m_write_file.call_args
- assert expected == kwargs[kwarg]
-
- def test_static_parameters_are_passed(self, m_write_file):
- """Test that the static write_files parameters are passed correctly."""
- util.ensure_file(mock.sentinel.path)
-
- assert 1 == m_write_file.call_count
- _args, kwargs = m_write_file.call_args
- assert "" == kwargs["content"]
- assert "ab" == kwargs["omode"]
-
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/tests/test_version.py b/cloudinit/tests/test_version.py
deleted file mode 100644
index 778a762c..00000000
--- a/cloudinit/tests/test_version.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from unittest import mock
-
-from cloudinit.tests.helpers import CiTestCase
-from cloudinit import version
-
-
-class TestExportsFeatures(CiTestCase):
- def test_has_network_config_v1(self):
- self.assertIn('NETWORK_CONFIG_V1', version.FEATURES)
-
- def test_has_network_config_v2(self):
- self.assertIn('NETWORK_CONFIG_V2', version.FEATURES)
-
-
-class TestVersionString(CiTestCase):
- @mock.patch("cloudinit.version._PACKAGED_VERSION",
- "17.2-3-gb05b9972-0ubuntu1")
- def test_package_version_respected(self):
- """If _PACKAGED_VERSION is filled in, then it should be returned."""
- self.assertEqual("17.2-3-gb05b9972-0ubuntu1", version.version_string())
-
- @mock.patch("cloudinit.version._PACKAGED_VERSION", "@@PACKAGED_VERSION@@")
- @mock.patch("cloudinit.version.__VERSION__", "17.2")
- def test_package_version_skipped(self):
- """If _PACKAGED_VERSION is not modified, then return __VERSION__."""
- self.assertEqual("17.2", version.version_string())
-
-
-# vi: ts=4 expandtab