diff options
author | Scott Moser <smoser@ubuntu.com> | 2018-09-05 16:02:25 +0000 |
---|---|---|
committer | Server Team CI Bot <josh.powers+server-team-bot@canonical.com> | 2018-09-05 16:02:25 +0000 |
commit | a8dcad9ac62bb1d2a4f7489960395bad6cac9382 (patch) | |
tree | 837df7fce2bdc2c05ce7c3bf497fc8a5be5cb99f /tests/unittests/test_datasource/test_smartos.py | |
parent | db50bc0d999e3a90136864a774f85e4e15b144e8 (diff) | |
download | vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.tar.gz vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.zip |
tests: Disallow use of util.subp except for where needed.
In many cases, cloud-init uses 'util.subp' to run a subprocess.
This is not really desirable in our unit tests as it makes the tests
dependent upon existance of those utilities.
The change here is to modify the base test case class (CiTestCase) to
raise exception any time subp is called. Then, fix all callers.
For cases where subp is necessary or actually desired, we can use it
via
a.) context hander CiTestCase.allow_subp(value)
b.) class level self.allowed_subp = value
Both cases the value is a list of acceptable executable names that
will be called (essentially argv[0]).
Some cleanups in AltCloud were done as the code was being updated.
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 94 |
1 files changed, 64 insertions, 30 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index dca0b3d4..46d67b94 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -20,10 +20,8 @@ import multiprocessing import os import os.path import re -import shutil import signal import stat -import tempfile import unittest2 import uuid @@ -31,15 +29,27 @@ from cloudinit import serial from cloudinit.sources import DataSourceSmartOS from cloudinit.sources.DataSourceSmartOS import ( convert_smartos_network_data as convert_net, - SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ) + SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ, + identify_file) import six from cloudinit import helpers as c_helpers -from cloudinit.util import (b64e, subp) +from cloudinit.util import ( + b64e, subp, ProcessExecutionError, which, write_file) -from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase +from cloudinit.tests.helpers import ( + CiTestCase, mock, FilesystemMockingTestCase, skipIf) + +try: + import serial as _pyserial + assert _pyserial # avoid pyflakes error F401: import unused + HAS_PYSERIAL = True +except ImportError: + HAS_PYSERIAL = False + +DSMOS = 'cloudinit.sources.DataSourceSmartOS' SDC_NICS = json.loads(""" [ { @@ -366,37 +376,33 @@ class PsuedoJoyentClient(object): class TestSmartOSDataSource(FilesystemMockingTestCase): + jmc_cfact = None + get_smartos_environ = None + def setUp(self): super(TestSmartOSDataSource, self).setUp() - dsmos = 'cloudinit.sources.DataSourceSmartOS' - patcher = mock.patch(dsmos + ".jmc_client_factory") - self.jmc_cfact = patcher.start() - self.addCleanup(patcher.stop) - patcher = mock.patch(dsmos + ".get_smartos_environ") - self.get_smartos_environ = patcher.start() - self.addCleanup(patcher.stop) - - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = c_helpers.Paths( - {'cloud_dir': self.tmp, 'run_dir': self.tmp}) - - self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') + self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ") + self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact") + self.legacy_user_d = self.tmp_path('legacy_user_tmp') os.mkdir(self.legacy_user_d) - - self.orig_lud = DataSourceSmartOS.LEGACY_USER_D - DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d - - def tearDown(self): - DataSourceSmartOS.LEGACY_USER_D = self.orig_lud - super(TestSmartOSDataSource, self).tearDown() + self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d", + autospec=False, new=self.legacy_user_d) + self.add_patch(DSMOS + ".identify_file", "m_identify_file", + return_value="text/plain") def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, sys_cfg=None, ds_cfg=None): self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) self.get_smartos_environ.return_value = mode + tmpd = self.tmp_dir() + dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd), + 'run_dir': self.tmp_path('run_dir')} + for d in dirs.values(): + os.mkdir(d) + paths = c_helpers.Paths(dirs) + if sys_cfg is None: sys_cfg = {} @@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): sys_cfg['datasource']['SmartOS'] = ds_cfg return DataSourceSmartOS.DataSourceSmartOS( - sys_cfg, distro=None, paths=self.paths) + sys_cfg, distro=None, paths=paths) def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d + print("legacy_script_f=%s" % legacy_script_f) self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] @@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) +class TestIdentifyFile(CiTestCase): + """Test the 'identify_file' utility.""" + @skipIf(not which("file"), "command 'file' not available.") + def test_file_happy_path(self): + """Test file is available and functional on plain text.""" + fname = self.tmp_path("myfile") + write_file(fname, "plain text content here\n") + with self.allow_subp(["file"]): + self.assertEqual("text/plain", identify_file(fname)) + + @mock.patch(DSMOS + ".util.subp") + def test_returns_none_on_error(self, m_subp): + """On 'file' execution error, None should be returned.""" + m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99) + fname = self.tmp_path("myfile") + write_file(fname, "plain text content here\n") + self.assertEqual(None, identify_file(fname)) + self.assertEqual( + [mock.call(["file", "--brief", "--mime-type", fname])], + m_subp.call_args_list) + + class ShortReader(object): """Implements a 'read' interface for bytes provided. much like io.BytesIO but the 'endbyte' acts as if EOF. @@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): self.assertEqual(client.list(), []) -class TestNetworkConversion(TestCase): +class TestNetworkConversion(CiTestCase): def test_convert_simple(self): expected = { 'version': 1, @@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase): "Only supported on KVM and bhyve guests under SmartOS") @unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK), "Requires write access to " + SERIAL_DEVICE) -class TestSerialConcurrency(TestCase): +@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available") +class TestSerialConcurrency(CiTestCase): """ This class tests locking on an actual serial port, and as such can only be run in a kvm or bhyve guest running on a SmartOS host. A test run on @@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase): there is only one session over a connection. In contrast, in the absence of proper locking multiple processes opening the same serial port can corrupt each others' exchanges with the metadata server. + + This takes on the order of 2 to 3 minutes to run. """ + allowed_subp = ['mdata-get'] + def setUp(self): self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop) self.mdata_proc.start() @@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase): keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()] keys.extend(ds.SMARTOS_ATTRIB_JSON.values()) - client = ds.jmc_client_factory() + client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM) self.assertIsNotNone(client) # The behavior that we are testing for was observed mdata-get running |