From d7cad8b61a3b5929a65202e0964aa9b4624e06c4 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Mon, 20 Apr 2020 14:50:37 -0400 Subject: tests: add missing mocks for get_interfaces_by_mac (#326) We currently have a test system where get_interfaces_by_mac raises an exception, which is causing these tests to fail as they aren't mocking get_interfaces_by_mac out. LP: #1873910 --- tests/unittests/test_datasource/test_opennebula.py | 1 + 1 file changed, 1 insertion(+) (limited to 'tests/unittests/test_datasource/test_opennebula.py') diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index bb399f6d..de896a9e 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -355,6 +355,7 @@ class TestOpenNebulaDataSource(CiTestCase): util.find_devs_with = orig_find_devs_with +@mock.patch(DS_PATH + '.net.get_interfaces_by_mac', mock.Mock(return_value={})) class TestOpenNebulaNetwork(unittest.TestCase): system_nics = ('eth0', 'ens3') -- cgit v1.2.3 From 87b8080f3163574580a207f15ff308da6010b0ff Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Mon, 8 Jun 2020 17:08:43 -0400 Subject: test: move conftest.py to top-level, to cover tests/ also (#414) * test_opennebula: convert TestParseShellConfig to a pytest test And allow it to run bash. (We aren't aiming to convert TestCase tests to pytest tests as a rule. In this case, I needed to change its implementation to limit subp usage, and I chose pytest over CiTestCase.) * test: move conftest.py to top-level, to cover tests/ also This gives us a single conftest.py which is shared by all tests in the project. --- cloudinit/conftest.py | 72 ---------------------- conftest.py | 72 ++++++++++++++++++++++ tests/unittests/test_datasource/test_opennebula.py | 8 ++- tests/unittests/test_render_cloudcfg.py | 1 + 4 files changed, 79 insertions(+), 74 deletions(-) delete mode 100644 cloudinit/conftest.py create mode 100644 conftest.py (limited to 'tests/unittests/test_datasource/test_opennebula.py') diff --git a/cloudinit/conftest.py b/cloudinit/conftest.py deleted file mode 100644 index 251bca59..00000000 --- a/cloudinit/conftest.py +++ /dev/null @@ -1,72 +0,0 @@ -from unittest import mock - -import pytest - -from cloudinit import subp - - -@pytest.yield_fixture(autouse=True) -def disable_subp_usage(request): - """ - Across all (pytest) tests, ensure that subp.subp is not invoked. - - Note that this can only catch invocations where the util module is imported - and ``subp.subp(...)`` is called. ``from cloudinit.subp mport subp`` - imports happen before the patching here (or the CiTestCase monkey-patching) - happens, so are left untouched. - - To allow a particular test method or class to use subp.subp you can set the - parameter passed to this fixture to False using pytest.mark.parametrize:: - - @pytest.mark.parametrize("disable_subp_usage", [False], indirect=True) - def test_whoami(self): - subp.subp(["whoami"]) - - To instead allow subp.subp usage for a specific command, you can set the - parameter passed to this fixture to that command: - - @pytest.mark.parametrize("disable_subp_usage", ["bash"], indirect=True) - def test_bash(self): - subp.subp(["bash"]) - - To specify multiple commands, set the parameter to a list (note the - double-layered list: we specify a single parameter that is itself a list): - - @pytest.mark.parametrize( - "disable_subp_usage", ["bash", "whoami"], indirect=True) - def test_several_things(self): - subp.subp(["bash"]) - subp.subp(["whoami"]) - - This fixture (roughly) mirrors the functionality of - CiTestCase.allowed_subp. N.B. While autouse fixtures do affect non-pytest - tests, CiTestCase's allowed_subp does take precedence (and we have - TestDisableSubpUsageInTestSubclass to confirm that). - """ - should_disable = getattr(request, "param", True) - if should_disable: - if not isinstance(should_disable, (list, str)): - def side_effect(args, *other_args, **kwargs): - raise AssertionError("Unexpectedly used subp.subp") - else: - # Look this up before our patch is in place, so we have access to - # the real implementation in side_effect - real_subp = subp.subp - - if isinstance(should_disable, str): - should_disable = [should_disable] - - def side_effect(args, *other_args, **kwargs): - cmd = args[0] - if cmd not in should_disable: - raise AssertionError( - "Unexpectedly used subp.subp to call {} (allowed:" - " {})".format(cmd, ",".join(should_disable)) - ) - return real_subp(args, *other_args, **kwargs) - - with mock.patch('cloudinit.subp.subp', autospec=True) as m_subp: - m_subp.side_effect = side_effect - yield - else: - yield diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..251bca59 --- /dev/null +++ b/conftest.py @@ -0,0 +1,72 @@ +from unittest import mock + +import pytest + +from cloudinit import subp + + +@pytest.yield_fixture(autouse=True) +def disable_subp_usage(request): + """ + Across all (pytest) tests, ensure that subp.subp is not invoked. + + Note that this can only catch invocations where the util module is imported + and ``subp.subp(...)`` is called. ``from cloudinit.subp mport subp`` + imports happen before the patching here (or the CiTestCase monkey-patching) + happens, so are left untouched. + + To allow a particular test method or class to use subp.subp you can set the + parameter passed to this fixture to False using pytest.mark.parametrize:: + + @pytest.mark.parametrize("disable_subp_usage", [False], indirect=True) + def test_whoami(self): + subp.subp(["whoami"]) + + To instead allow subp.subp usage for a specific command, you can set the + parameter passed to this fixture to that command: + + @pytest.mark.parametrize("disable_subp_usage", ["bash"], indirect=True) + def test_bash(self): + subp.subp(["bash"]) + + To specify multiple commands, set the parameter to a list (note the + double-layered list: we specify a single parameter that is itself a list): + + @pytest.mark.parametrize( + "disable_subp_usage", ["bash", "whoami"], indirect=True) + def test_several_things(self): + subp.subp(["bash"]) + subp.subp(["whoami"]) + + This fixture (roughly) mirrors the functionality of + CiTestCase.allowed_subp. N.B. While autouse fixtures do affect non-pytest + tests, CiTestCase's allowed_subp does take precedence (and we have + TestDisableSubpUsageInTestSubclass to confirm that). + """ + should_disable = getattr(request, "param", True) + if should_disable: + if not isinstance(should_disable, (list, str)): + def side_effect(args, *other_args, **kwargs): + raise AssertionError("Unexpectedly used subp.subp") + else: + # Look this up before our patch is in place, so we have access to + # the real implementation in side_effect + real_subp = subp.subp + + if isinstance(should_disable, str): + should_disable = [should_disable] + + def side_effect(args, *other_args, **kwargs): + cmd = args[0] + if cmd not in should_disable: + raise AssertionError( + "Unexpectedly used subp.subp to call {} (allowed:" + " {})".format(cmd, ",".join(should_disable)) + ) + return real_subp(args, *other_args, **kwargs) + + with mock.patch('cloudinit.subp.subp', autospec=True) as m_subp: + m_subp.side_effect = side_effect + yield + else: + yield diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index de896a9e..7c859c8a 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -9,6 +9,8 @@ import os import pwd import unittest +import pytest + TEST_VARS = { 'VAR1': 'single', @@ -914,12 +916,14 @@ class TestOpenNebulaNetwork(unittest.TestCase): self.assertEqual(expected, net.gen_conf()) -class TestParseShellConfig(unittest.TestCase): +class TestParseShellConfig: + + @pytest.mark.parametrize('disable_subp_usage', ['bash'], indirect=True) def test_no_seconds(self): cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) # we could test 'sleep 2', but that would make the test run slower. ret = ds.parse_shell_config(cfg) - self.assertEqual(ret, {"foo": "bar", "xx": "foo"}) + assert ret == {"foo": "bar", "xx": "foo"} def populate_context_dir(path, variables): diff --git a/tests/unittests/test_render_cloudcfg.py b/tests/unittests/test_render_cloudcfg.py index 393a78b1..696915a3 100644 --- a/tests/unittests/test_render_cloudcfg.py +++ b/tests/unittests/test_render_cloudcfg.py @@ -13,6 +13,7 @@ DISTRO_VARIANTS = ["amazon", "arch", "centos", "debian", "fedora", "freebsd", "netbsd", "openbsd", "rhel", "suse", "ubuntu", "unknown"] +@pytest.mark.parametrize('disable_subp_usage', [sys.executable], indirect=True) class TestRenderCloudCfg: cmd = [sys.executable, os.path.realpath('tools/render-cloudcfg')] -- cgit v1.2.3 From 882f1a5f2d5bafd08e6900a2782c3affa67c9d86 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Tue, 30 Jun 2020 14:19:38 -0400 Subject: networking: refactor is_physical from cloudinit.net (#457) As the first refactor PR, this also includes the initial structure for tests. LP: #1884619 --- cloudinit/distros/networking.py | 16 ++- cloudinit/distros/tests/test_networking.py | 42 ++++++ cloudinit/net/__init__.py | 4 - cloudinit/net/tests/test_init.py | 6 - cloudinit/sources/DataSourceDigitalOcean.py | 2 +- cloudinit/sources/DataSourceOpenNebula.py | 30 +++-- cloudinit/sources/helpers/digitalocean.py | 12 +- tests/unittests/test_datasource/test_opennebula.py | 149 +++++++++++++-------- 8 files changed, 181 insertions(+), 80 deletions(-) create mode 100644 cloudinit/distros/tests/test_networking.py (limited to 'tests/unittests/test_datasource/test_opennebula.py') diff --git a/cloudinit/distros/networking.py b/cloudinit/distros/networking.py index eecdccc6..e421a2ce 100644 --- a/cloudinit/distros/networking.py +++ b/cloudinit/distros/networking.py @@ -1,4 +1,5 @@ import abc +import os from cloudinit import net @@ -79,8 +80,15 @@ class Networking(metaclass=abc.ABCMeta): def is_bridge(self, devname: DeviceName) -> bool: return net.is_bridge(devname) + @abc.abstractmethod def is_physical(self, devname: DeviceName) -> bool: - return net.is_physical(devname) + """ + Is ``devname`` a physical network device? + + Examples of non-physical network devices: bonds, bridges, tunnels, + loopback devices. + """ + pass def is_renamed(self, devname: DeviceName) -> bool: return net.is_renamed(devname) @@ -103,7 +111,8 @@ class Networking(metaclass=abc.ABCMeta): class BSDNetworking(Networking): """Implementation of networking functionality shared across BSDs.""" - pass + def is_physical(self, devname: DeviceName) -> bool: + raise NotImplementedError() class LinuxNetworking(Networking): @@ -126,3 +135,6 @@ class LinuxNetworking(Networking): def is_netfail_standby(self, devname: DeviceName) -> bool: return net.is_netfail_standby(devname) + + def is_physical(self, devname: DeviceName) -> bool: + return os.path.exists(net.sys_dev_path(devname, "device")) diff --git a/cloudinit/distros/tests/test_networking.py b/cloudinit/distros/tests/test_networking.py new file mode 100644 index 00000000..2acb12f4 --- /dev/null +++ b/cloudinit/distros/tests/test_networking.py @@ -0,0 +1,42 @@ +from unittest import mock + +import pytest + +from cloudinit.distros.networking import BSDNetworking, LinuxNetworking + + +@pytest.yield_fixture +def sys_class_net(tmpdir): + sys_class_net_path = tmpdir.join("sys/class/net") + sys_class_net_path.ensure_dir() + with mock.patch( + "cloudinit.net.get_sys_class_path", + return_value=sys_class_net_path.strpath + "/", + ): + yield sys_class_net_path + + +class TestBSDNetworkingIsPhysical: + def test_raises_notimplementederror(self): + with pytest.raises(NotImplementedError): + BSDNetworking().is_physical("eth0") + + +class TestLinuxNetworkingIsPhysical: + def test_returns_false_by_default(self, sys_class_net): + assert not LinuxNetworking().is_physical("eth0") + + def test_returns_false_if_devname_exists_but_not_physical( + self, sys_class_net + ): + devname = "eth0" + sys_class_net.join(devname).mkdir() + assert not LinuxNetworking().is_physical(devname) + + def test_returns_true_if_device_is_physical(self, sys_class_net): + devname = "eth0" + device_dir = sys_class_net.join(devname) + device_dir.mkdir() + device_dir.join("device").write("") + + assert LinuxNetworking().is_physical(devname) diff --git a/cloudinit/net/__init__.py b/cloudinit/net/__init__.py index fd5a1b3e..9d8c7ba9 100644 --- a/cloudinit/net/__init__.py +++ b/cloudinit/net/__init__.py @@ -262,10 +262,6 @@ def is_vlan(devname): return 'DEVTYPE=vlan' in uevent.splitlines() -def is_physical(devname): - return os.path.exists(sys_dev_path(devname, "device")) - - def device_driver(devname): """Return the device driver for net device named 'devname'.""" driver = None diff --git a/cloudinit/net/tests/test_init.py b/cloudinit/net/tests/test_init.py index e36d4387..eb458c39 100644 --- a/cloudinit/net/tests/test_init.py +++ b/cloudinit/net/tests/test_init.py @@ -198,12 +198,6 @@ class TestReadSysNet(CiTestCase): write_file(os.path.join(self.sysdir, 'eth0', 'uevent'), content) self.assertTrue(net.is_vlan('eth0')) - def test_is_physical(self): - """is_physical is True when /sys/net/devname/device exists.""" - self.assertFalse(net.is_physical('eth0')) - ensure_file(os.path.join(self.sysdir, 'eth0', 'device')) - self.assertTrue(net.is_physical('eth0')) - class TestGenerateFallbackConfig(CiTestCase): diff --git a/cloudinit/sources/DataSourceDigitalOcean.py b/cloudinit/sources/DataSourceDigitalOcean.py index e0ef665e..5040ce5b 100644 --- a/cloudinit/sources/DataSourceDigitalOcean.py +++ b/cloudinit/sources/DataSourceDigitalOcean.py @@ -58,7 +58,7 @@ class DataSourceDigitalOcean(sources.DataSource): ipv4LL_nic = None if self.use_ip4LL: - ipv4LL_nic = do_helper.assign_ipv4_link_local() + ipv4LL_nic = do_helper.assign_ipv4_link_local(self.distro) md = do_helper.read_metadata( self.metadata_address, timeout=self.timeout, diff --git a/cloudinit/sources/DataSourceOpenNebula.py b/cloudinit/sources/DataSourceOpenNebula.py index c7fdc2a5..12b1f94f 100644 --- a/cloudinit/sources/DataSourceOpenNebula.py +++ b/cloudinit/sources/DataSourceOpenNebula.py @@ -13,6 +13,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import collections +import functools import os import pwd import re @@ -60,10 +61,19 @@ class DataSourceOpenNebula(sources.DataSource): for cdev in candidates: try: if os.path.isdir(self.seed_dir): - results = read_context_disk_dir(cdev, asuser=parseuser) + results = read_context_disk_dir( + cdev, self.distro, asuser=parseuser + ) elif cdev.startswith("/dev"): - results = util.mount_cb(cdev, read_context_disk_dir, - data=parseuser) + # util.mount_cb only handles passing a single argument + # through to the wrapped function, so we have to partially + # apply the function to pass in `distro`. See LP: #1884979 + partially_applied_func = functools.partial( + read_context_disk_dir, + asuser=parseuser, + distro=self.distro, + ) + results = util.mount_cb(cdev, partially_applied_func) except NonContextDiskDir: continue except BrokenContextDiskDir as exc: @@ -129,10 +139,10 @@ class BrokenContextDiskDir(Exception): class OpenNebulaNetwork(object): - def __init__(self, context, system_nics_by_mac=None): + def __init__(self, context, distro, system_nics_by_mac=None): self.context = context if system_nics_by_mac is None: - system_nics_by_mac = get_physical_nics_by_mac() + system_nics_by_mac = get_physical_nics_by_mac(distro) self.ifaces = collections.OrderedDict( [k for k in sorted(system_nics_by_mac.items(), key=lambda k: net.natural_sort_key(k[1]))]) @@ -367,7 +377,7 @@ def parse_shell_config(content, keylist=None, bash=None, asuser=None, return ret -def read_context_disk_dir(source_dir, asuser=None): +def read_context_disk_dir(source_dir, distro, asuser=None): """ read_context_disk_dir(source_dir): read source_dir and return a tuple with metadata dict and user-data @@ -450,15 +460,17 @@ def read_context_disk_dir(source_dir, asuser=None): # http://docs.opennebula.org/5.4/operation/references/template.html#context-section ipaddr_keys = [k for k in context if re.match(r'^ETH\d+_IP.*$', k)] if ipaddr_keys: - onet = OpenNebulaNetwork(context) + onet = OpenNebulaNetwork(context, distro) results['network-interfaces'] = onet.gen_conf() return results -def get_physical_nics_by_mac(): +def get_physical_nics_by_mac(distro): devs = net.get_interfaces_by_mac() - return dict([(m, n) for m, n in devs.items() if net.is_physical(n)]) + return dict( + [(m, n) for m, n in devs.items() if distro.networking.is_physical(n)] + ) # Legacy: Must be present in case we load an old pkl object diff --git a/cloudinit/sources/helpers/digitalocean.py b/cloudinit/sources/helpers/digitalocean.py index f5bbe46a..b545c4d6 100644 --- a/cloudinit/sources/helpers/digitalocean.py +++ b/cloudinit/sources/helpers/digitalocean.py @@ -16,7 +16,7 @@ NIC_MAP = {'public': 'eth0', 'private': 'eth1'} LOG = logging.getLogger(__name__) -def assign_ipv4_link_local(nic=None): +def assign_ipv4_link_local(distro, nic=None): """Bring up NIC using an address using link-local (ip4LL) IPs. On DigitalOcean, the link-local domain is per-droplet routed, so there is no risk of collisions. However, to be more safe, the ip4LL @@ -24,7 +24,7 @@ def assign_ipv4_link_local(nic=None): """ if not nic: - nic = get_link_local_nic() + nic = get_link_local_nic(distro) LOG.debug("selected interface '%s' for reading metadata", nic) if not nic: @@ -54,8 +54,12 @@ def assign_ipv4_link_local(nic=None): return nic -def get_link_local_nic(): - nics = [f for f in cloudnet.get_devicelist() if cloudnet.is_physical(f)] +def get_link_local_nic(distro): + nics = [ + f + for f in cloudnet.get_devicelist() + if distro.networking.is_physical(f) + ] if not nics: return None return min(nics, key=lambda d: cloudnet.read_sys_net_int(d, 'ifindex')) diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index 7c859c8a..80841182 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -132,18 +132,18 @@ class TestOpenNebulaDataSource(CiTestCase): def test_seed_dir_non_contextdisk(self): self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, - self.seed_dir) + self.seed_dir, mock.Mock()) def test_seed_dir_empty1_context(self): populate_dir(self.seed_dir, {'context.sh': ''}) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertIsNone(results['userdata']) self.assertEqual(results['metadata'], {}) def test_seed_dir_empty2_context(self): populate_context_dir(self.seed_dir, {}) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertIsNone(results['userdata']) self.assertEqual(results['metadata'], {}) @@ -153,11 +153,11 @@ class TestOpenNebulaDataSource(CiTestCase): self.assertRaises(ds.BrokenContextDiskDir, ds.read_context_disk_dir, - self.seed_dir) + self.seed_dir, mock.Mock()) def test_context_parser(self): populate_context_dir(self.seed_dir, TEST_VARS) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('metadata' in results) self.assertEqual(TEST_VARS, results['metadata']) @@ -168,7 +168,7 @@ class TestOpenNebulaDataSource(CiTestCase): for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'): my_d = os.path.join(self.tmp, "%s-%i" % (k, c)) populate_context_dir(my_d, {k: '\n'.join(public_keys)}) - results = ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d, mock.Mock()) self.assertTrue('metadata' in results) self.assertTrue('public-keys' in results['metadata']) @@ -182,7 +182,7 @@ class TestOpenNebulaDataSource(CiTestCase): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: USER_DATA, 'USERDATA_ENCODING': ''}) - results = ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d, mock.Mock()) self.assertTrue('userdata' in results) self.assertEqual(USER_DATA, results['userdata']) @@ -192,7 +192,7 @@ class TestOpenNebulaDataSource(CiTestCase): for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: b64userdata}) - results = ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d, mock.Mock()) self.assertTrue('userdata' in results) self.assertEqual(b64userdata, results['userdata']) @@ -202,7 +202,7 @@ class TestOpenNebulaDataSource(CiTestCase): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: util.b64e(USER_DATA), 'USERDATA_ENCODING': 'base64'}) - results = ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d, mock.Mock()) self.assertTrue('userdata' in results) self.assertEqual(USER_DATA, results['userdata']) @@ -214,7 +214,7 @@ class TestOpenNebulaDataSource(CiTestCase): for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: PUBLIC_IP}) - results = ds.read_context_disk_dir(my_d) + results = ds.read_context_disk_dir(my_d, mock.Mock()) self.assertTrue('metadata' in results) self.assertTrue('local-hostname' in results['metadata']) @@ -229,7 +229,7 @@ class TestOpenNebulaDataSource(CiTestCase): # without ETH0_MAC # for Older OpenNebula? populate_context_dir(self.seed_dir, {'ETH0_IP': IP_BY_MACADDR}) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -239,7 +239,7 @@ class TestOpenNebulaDataSource(CiTestCase): # ETH0_IP and ETH0_MAC populate_context_dir( self.seed_dir, {'ETH0_IP': IP_BY_MACADDR, 'ETH0_MAC': MACADDR}) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -251,7 +251,7 @@ class TestOpenNebulaDataSource(CiTestCase): # "AR = [ TYPE = ETHER ]" populate_context_dir( self.seed_dir, {'ETH0_IP': '', 'ETH0_MAC': MACADDR}) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -265,7 +265,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_MAC': MACADDR, 'ETH0_MASK': '255.255.0.0' }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -279,7 +279,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_MAC': MACADDR, 'ETH0_MASK': '' }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -292,7 +292,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_IP6': IP6_GLOBAL, 'ETH0_MAC': MACADDR, }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -305,7 +305,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_IP6_ULA': IP6_ULA, 'ETH0_MAC': MACADDR, }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -319,7 +319,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX, 'ETH0_MAC': MACADDR, }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -333,7 +333,7 @@ class TestOpenNebulaDataSource(CiTestCase): 'ETH0_IP6_PREFIX_LENGTH': '', 'ETH0_MAC': MACADDR, }) - results = ds.read_context_disk_dir(self.seed_dir) + results = ds.read_context_disk_dir(self.seed_dir, mock.Mock()) self.assertTrue('network-interfaces' in results) self.assertTrue( @@ -370,7 +370,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): expected = { '02:00:0a:12:01:01': 'ETH0', '02:00:0a:12:0f:0f': 'ETH1', } - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(expected, net.context_devname) def test_get_nameservers(self): @@ -385,21 +385,21 @@ class TestOpenNebulaNetwork(unittest.TestCase): expected = { 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8'], 'search': ['example.com', 'example.org']} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_nameservers('eth0') self.assertEqual(expected, val) def test_get_mtu(self): """Verify get_mtu('device') correctly returns MTU size.""" context = {'ETH0_MTU': '1280'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_mtu('eth0') self.assertEqual('1280', val) def test_get_ip(self): """Verify get_ip('device') correctly returns IPv4 address.""" context = {'ETH0_IP': PUBLIC_IP} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip('eth0', MACADDR) self.assertEqual(PUBLIC_IP, val) @@ -410,7 +410,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): string. """ context = {'ETH0_IP': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip('eth0', MACADDR) self.assertEqual(IP_BY_MACADDR, val) @@ -423,7 +423,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH0_IP6': IP6_GLOBAL, 'ETH0_IP6_ULA': '', } expected = [IP6_GLOBAL] - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip6('eth0') self.assertEqual(expected, val) @@ -436,7 +436,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH0_IP6': '', 'ETH0_IP6_ULA': IP6_ULA, } expected = [IP6_ULA] - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip6('eth0') self.assertEqual(expected, val) @@ -449,7 +449,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH0_IP6': IP6_GLOBAL, 'ETH0_IP6_ULA': IP6_ULA, } expected = [IP6_GLOBAL, IP6_ULA] - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip6('eth0') self.assertEqual(expected, val) @@ -458,7 +458,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): Verify get_ip6_prefix('device') correctly returns IPv6 prefix. """ context = {'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip6_prefix('eth0') self.assertEqual(IP6_PREFIX, val) @@ -469,7 +469,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): string. """ context = {'ETH0_IP6_PREFIX_LENGTH': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_ip6_prefix('eth0') self.assertEqual('64', val) @@ -479,7 +479,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): address. """ context = {'ETH0_GATEWAY': '1.2.3.5'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_gateway('eth0') self.assertEqual('1.2.3.5', val) @@ -489,7 +489,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): address. """ context = {'ETH0_GATEWAY6': IP6_GW} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_gateway6('eth0') self.assertEqual(IP6_GW, val) @@ -498,7 +498,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): Verify get_mask('device') correctly returns IPv4 subnet mask. """ context = {'ETH0_MASK': '255.255.0.0'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_mask('eth0') self.assertEqual('255.255.0.0', val) @@ -508,7 +508,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): It returns default value '255.255.255.0' if ETH0_MASK has empty string. """ context = {'ETH0_MASK': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_mask('eth0') self.assertEqual('255.255.255.0', val) @@ -517,7 +517,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): Verify get_network('device') correctly returns IPv4 network address. """ context = {'ETH0_NETWORK': '1.2.3.0'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_network('eth0', MACADDR) self.assertEqual('1.2.3.0', val) @@ -528,7 +528,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): empty string. """ context = {'ETH0_NETWORK': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_network('eth0', MACADDR) self.assertEqual('10.18.1.0', val) @@ -537,7 +537,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): Verify get_field('device', 'name') returns *context* value. """ context = {'ETH9_DUMMY': 'DUMMY_VALUE'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_field('eth9', 'dummy') self.assertEqual('DUMMY_VALUE', val) @@ -547,7 +547,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): value. """ context = {'ETH9_DUMMY': 'DUMMY_VALUE'} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE') self.assertEqual('DUMMY_VALUE', val) @@ -557,7 +557,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): value if context value is empty string. """ context = {'ETH9_DUMMY': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE') self.assertEqual('DEFAULT_VALUE', val) @@ -567,7 +567,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): empty string. """ context = {'ETH9_DUMMY': ''} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_field('eth9', 'dummy') self.assertEqual(None, val) @@ -577,7 +577,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): None. """ context = {'ETH9_DUMMY': None} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) val = net.get_field('eth9', 'dummy') self.assertEqual(None, val) @@ -597,7 +597,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) # set ETH0_GATEWAY @@ -613,7 +613,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) @mock.patch(DS_PATH + ".get_physical_nics_by_mac") @@ -632,7 +632,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) # set ETH0_GATEWAY6 @@ -648,7 +648,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) @mock.patch(DS_PATH + ".get_physical_nics_by_mac") @@ -669,7 +669,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) # set ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH @@ -689,7 +689,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): IP6_GLOBAL + '/' + IP6_PREFIX, IP6_ULA + '/' + IP6_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) @mock.patch(DS_PATH + ".get_physical_nics_by_mac") @@ -710,7 +710,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) # set DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN @@ -730,7 +730,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) @mock.patch(DS_PATH + ".get_physical_nics_by_mac") @@ -749,7 +749,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) # set ETH0_MTU @@ -765,14 +765,14 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork(context) + net = ds.OpenNebulaNetwork(context, mock.Mock()) self.assertEqual(net.gen_conf(), expected) @mock.patch(DS_PATH + ".get_physical_nics_by_mac") def test_eth0(self, m_get_phys_by_mac): for nic in self.system_nics: m_get_phys_by_mac.return_value = {MACADDR: nic} - net = ds.OpenNebulaNetwork({}) + net = ds.OpenNebulaNetwork({}, mock.Mock()) expected = { 'version': 2, 'ethernets': { @@ -782,6 +782,14 @@ class TestOpenNebulaNetwork(unittest.TestCase): self.assertEqual(net.gen_conf(), expected) + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_distro_passed_through(self, m_get_physical_nics_by_mac): + ds.OpenNebulaNetwork({}, mock.sentinel.distro) + self.assertEqual( + [mock.call(mock.sentinel.distro)], + m_get_physical_nics_by_mac.call_args_list, + ) + def test_eth0_override(self): self.maxDiff = None context = { @@ -800,7 +808,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH0_SEARCH_DOMAIN': '', } for nic in self.system_nics: - net = ds.OpenNebulaNetwork(context, + net = ds.OpenNebulaNetwork(context, mock.Mock(), system_nics_by_mac={MACADDR: nic}) expected = { 'version': 2, @@ -832,7 +840,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH0_SEARCH_DOMAIN': 'example.com example.org', } for nic in self.system_nics: - net = ds.OpenNebulaNetwork(context, + net = ds.OpenNebulaNetwork(context, mock.Mock(), system_nics_by_mac={MACADDR: nic}) expected = { @@ -886,7 +894,10 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'ETH3_SEARCH_DOMAIN': 'third.example.com third.example.org', } net = ds.OpenNebulaNetwork( - context, system_nics_by_mac={MAC_1: 'enp0s25', MAC_2: 'enp1s2'}) + context, + mock.Mock(), + system_nics_by_mac={MAC_1: 'enp0s25', MAC_2: 'enp1s2'} + ) expected = { 'version': 2, @@ -926,6 +937,36 @@ class TestParseShellConfig: assert ret == {"foo": "bar", "xx": "foo"} +class TestGetPhysicalNicsByMac: + @pytest.mark.parametrize( + "interfaces_by_mac,physical_devs,expected_return", + [ + # No interfaces => empty return + ({}, [], {}), + # Only virtual interface => empty return + ({"mac1": "virtual0"}, [], {}), + # Only physical interface => it is returned + ({"mac2": "physical0"}, ["physical0"], {"mac2": "physical0"}), + # Combination of physical and virtual => only physical returned + ( + {"mac3": "physical1", "mac4": "virtual1"}, + ["physical1"], + {"mac3": "physical1"}, + ), + ], + ) + def test(self, interfaces_by_mac, physical_devs, expected_return): + distro = mock.Mock() + distro.networking.is_physical.side_effect = ( + lambda devname: devname in physical_devs + ) + with mock.patch( + DS_PATH + ".net.get_interfaces_by_mac", + return_value=interfaces_by_mac, + ): + assert expected_return == ds.get_physical_nics_by_mac(distro) + + def populate_context_dir(path, variables): data = "# Context variables generated by OpenNebula\n" for k, v in variables.items(): -- cgit v1.2.3 From 2b727914e8cbee6810b1bb9a1cfdb90ad521ceb6 Mon Sep 17 00:00:00 2001 From: Daniel Watkins Date: Thu, 2 Jul 2020 13:51:28 -0400 Subject: tests: use markers to configure disable_subp_usage (#473) This is an improvement over indirect parameterisation for a few reasons: * The test code is much easier to read, the mark names are much more intuitive than the indirect parameterisation invocation, and there's less boilerplate to boot * The fixture no longer has to overload the single parameter that fixtures can take with multiple meanings --- cloudinit/tests/test_conftest.py | 14 ++-- conftest.py | 93 ++++++++++++++-------- tests/unittests/test_datasource/test_opennebula.py | 3 +- tests/unittests/test_render_cloudcfg.py | 2 +- tox.ini | 7 ++ 5 files changed, 76 insertions(+), 43 deletions(-) (limited to 'tests/unittests/test_datasource/test_opennebula.py') diff --git a/cloudinit/tests/test_conftest.py b/cloudinit/tests/test_conftest.py index a6537248..6f1263a5 100644 --- a/cloudinit/tests/test_conftest.py +++ b/cloudinit/tests/test_conftest.py @@ -17,12 +17,11 @@ class TestDisableSubpUsage: # pylint: disable=no-value-for-parameter subp.subp() - @pytest.mark.parametrize('disable_subp_usage', [False], indirect=True) + @pytest.mark.allow_all_subp def test_subp_usage_can_be_reenabled(self): subp.subp(['whoami']) - @pytest.mark.parametrize( - 'disable_subp_usage', [['whoami'], 'whoami'], indirect=True) + @pytest.mark.allow_subp_for("whoami") def test_subp_usage_can_be_conditionally_reenabled(self): # The two parameters test each potential invocation with a single # argument @@ -31,8 +30,7 @@ class TestDisableSubpUsage: assert "allowed: whoami" in str(excinfo.value) subp.subp(['whoami']) - @pytest.mark.parametrize( - 'disable_subp_usage', [['whoami', 'bash']], indirect=True) + @pytest.mark.allow_subp_for("whoami", "bash") def test_subp_usage_can_be_conditionally_reenabled_for_multiple_cmds(self): with pytest.raises(AssertionError) as excinfo: subp.subp(["some", "args"]) @@ -40,6 +38,12 @@ class TestDisableSubpUsage: subp.subp(['bash', '-c', 'true']) subp.subp(['whoami']) + @pytest.mark.allow_all_subp + @pytest.mark.allow_subp_for("bash") + def test_both_marks_raise_an_error(self): + with pytest.raises(AssertionError, match="marked both"): + subp.subp(["bash"]) + class TestDisableSubpUsageInTestSubclass(CiTestCase): """Test that disable_subp_usage doesn't impact CiTestCase's subp logic.""" diff --git a/conftest.py b/conftest.py index 251bca59..faf13804 100644 --- a/conftest.py +++ b/conftest.py @@ -5,6 +5,18 @@ import pytest from cloudinit import subp +def _closest_marker_args_or(request, marker_name: str, default): + """Get the args for the closest ``marker_name`` or return ``default``""" + try: + marker = request.node.get_closest_marker(marker_name) + except AttributeError: + # Older versions of pytest don't have the new API + marker = request.node.get_marker(marker_name) + if marker is not None: + return marker.args + return default + + @pytest.yield_fixture(autouse=True) def disable_subp_usage(request): """ @@ -15,25 +27,23 @@ def disable_subp_usage(request): imports happen before the patching here (or the CiTestCase monkey-patching) happens, so are left untouched. - To allow a particular test method or class to use subp.subp you can set the - parameter passed to this fixture to False using pytest.mark.parametrize:: + To allow a particular test method or class to use subp.subp you can mark it + as such:: - @pytest.mark.parametrize("disable_subp_usage", [False], indirect=True) + @pytest.mark.allow_all_subp def test_whoami(self): subp.subp(["whoami"]) - To instead allow subp.subp usage for a specific command, you can set the - parameter passed to this fixture to that command: + To instead allow subp.subp usage for a specific command, you can use the + ``allow_subp_for`` mark:: - @pytest.mark.parametrize("disable_subp_usage", ["bash"], indirect=True) + @pytest.mark.allow_subp_for("bash") def test_bash(self): subp.subp(["bash"]) - To specify multiple commands, set the parameter to a list (note the - double-layered list: we specify a single parameter that is itself a list): + You can pass multiple commands as values; they will all be permitted:: - @pytest.mark.parametrize( - "disable_subp_usage", ["bash", "whoami"], indirect=True) + @pytest.mark.allow_subp_for("bash", "whoami") def test_several_things(self): subp.subp(["bash"]) subp.subp(["whoami"]) @@ -43,30 +53,43 @@ def disable_subp_usage(request): tests, CiTestCase's allowed_subp does take precedence (and we have TestDisableSubpUsageInTestSubclass to confirm that). """ - should_disable = getattr(request, "param", True) - if should_disable: - if not isinstance(should_disable, (list, str)): - def side_effect(args, *other_args, **kwargs): - raise AssertionError("Unexpectedly used subp.subp") - else: - # Look this up before our patch is in place, so we have access to - # the real implementation in side_effect - real_subp = subp.subp - - if isinstance(should_disable, str): - should_disable = [should_disable] - - def side_effect(args, *other_args, **kwargs): - cmd = args[0] - if cmd not in should_disable: - raise AssertionError( - "Unexpectedly used subp.subp to call {} (allowed:" - " {})".format(cmd, ",".join(should_disable)) - ) - return real_subp(args, *other_args, **kwargs) - - with mock.patch('cloudinit.subp.subp', autospec=True) as m_subp: - m_subp.side_effect = side_effect - yield + allow_subp_for = _closest_marker_args_or(request, "allow_subp_for", None) + # Because the mark doesn't take arguments, `allow_all_subp` will be set to + # [] if the marker is present, so explicit None checks are required + allow_all_subp = _closest_marker_args_or(request, "allow_all_subp", None) + + if allow_all_subp is not None and allow_subp_for is None: + # Only allow_all_subp specified, don't mock subp.subp + yield + return + + if allow_all_subp is None and allow_subp_for is None: + # No marks, default behaviour; disallow all subp.subp usage + def side_effect(args, *other_args, **kwargs): + raise AssertionError("Unexpectedly used subp.subp") + + elif allow_all_subp is not None and allow_subp_for is not None: + # Both marks, ambiguous request; raise an exception on all subp usage + def side_effect(args, *other_args, **kwargs): + raise AssertionError( + "Test marked both allow_all_subp and allow_subp_for: resolve" + " this either by modifying your test code, or by modifying" + " disable_subp_usage to handle precedence." + ) else: + # Look this up before our patch is in place, so we have access to + # the real implementation in side_effect + real_subp = subp.subp + + def side_effect(args, *other_args, **kwargs): + cmd = args[0] + if cmd not in allow_subp_for: + raise AssertionError( + "Unexpectedly used subp.subp to call {} (allowed:" + " {})".format(cmd, ",".join(allow_subp_for)) + ) + return real_subp(args, *other_args, **kwargs) + + with mock.patch("cloudinit.subp.subp", autospec=True) as m_subp: + m_subp.side_effect = side_effect yield diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index 80841182..9c6070a5 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -928,8 +928,7 @@ class TestOpenNebulaNetwork(unittest.TestCase): class TestParseShellConfig: - - @pytest.mark.parametrize('disable_subp_usage', ['bash'], indirect=True) + @pytest.mark.allow_subp_for("bash") def test_no_seconds(self): cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) # we could test 'sleep 2', but that would make the test run slower. diff --git a/tests/unittests/test_render_cloudcfg.py b/tests/unittests/test_render_cloudcfg.py index 696915a3..495e2669 100644 --- a/tests/unittests/test_render_cloudcfg.py +++ b/tests/unittests/test_render_cloudcfg.py @@ -13,7 +13,7 @@ DISTRO_VARIANTS = ["amazon", "arch", "centos", "debian", "fedora", "freebsd", "netbsd", "openbsd", "rhel", "suse", "ubuntu", "unknown"] -@pytest.mark.parametrize('disable_subp_usage', [sys.executable], indirect=True) +@pytest.mark.allow_subp_for(sys.executable) class TestRenderCloudCfg: cmd = [sys.executable, os.path.realpath('tools/render-cloudcfg')] diff --git a/tox.ini b/tox.ini index 847a2fbd..5257f9e3 100644 --- a/tox.ini +++ b/tox.ini @@ -133,3 +133,10 @@ commands = {envpython} -m tests.cloud_tests {posargs} passenv = HOME TRAVIS deps = -r{toxinidir}/integration-requirements.txt + +[pytest] +# TODO: s/--strict/--strict-markers/ once xenial support is dropped +addopts = --strict +markers = + allow_subp_for: allow subp usage for the given commands (disable_subp_usage) + allow_all_subp: allow all subp usage (disable_subp_usage) -- cgit v1.2.3