summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_smartos.py
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2018-09-05 16:02:25 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-09-05 16:02:25 +0000
commita8dcad9ac62bb1d2a4f7489960395bad6cac9382 (patch)
tree837df7fce2bdc2c05ce7c3bf497fc8a5be5cb99f /tests/unittests/test_datasource/test_smartos.py
parentdb50bc0d999e3a90136864a774f85e4e15b144e8 (diff)
downloadvyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.tar.gz
vyos-cloud-init-a8dcad9ac62bb1d2a4f7489960395bad6cac9382.zip
tests: Disallow use of util.subp except for where needed.
In many cases, cloud-init uses 'util.subp' to run a subprocess. This is not really desirable in our unit tests as it makes the tests dependent upon existance of those utilities. The change here is to modify the base test case class (CiTestCase) to raise exception any time subp is called. Then, fix all callers. For cases where subp is necessary or actually desired, we can use it via   a.) context hander CiTestCase.allow_subp(value)   b.) class level self.allowed_subp = value Both cases the value is a list of acceptable executable names that will be called (essentially argv[0]). Some cleanups in AltCloud were done as the code was being updated.
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
1 files changed, 64 insertions, 30 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running