diff options
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
| -rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 94 | 
1 files changed, 64 insertions, 30 deletions
| diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index dca0b3d4..46d67b94 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -20,10 +20,8 @@ import multiprocessing  import os  import os.path  import re -import shutil  import signal  import stat -import tempfile  import unittest2  import uuid @@ -31,15 +29,27 @@ from cloudinit import serial  from cloudinit.sources import DataSourceSmartOS  from cloudinit.sources.DataSourceSmartOS import (      convert_smartos_network_data as convert_net, -    SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ) +    SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ, +    identify_file)  import six  from cloudinit import helpers as c_helpers -from cloudinit.util import (b64e, subp) +from cloudinit.util import ( +    b64e, subp, ProcessExecutionError, which, write_file) -from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase +from cloudinit.tests.helpers import ( +    CiTestCase, mock, FilesystemMockingTestCase, skipIf) + +try: +    import serial as _pyserial +    assert _pyserial  # avoid pyflakes error F401: import unused +    HAS_PYSERIAL = True +except ImportError: +    HAS_PYSERIAL = False + +DSMOS = 'cloudinit.sources.DataSourceSmartOS'  SDC_NICS = json.loads("""  [      { @@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):  class TestSmartOSDataSource(FilesystemMockingTestCase): +    jmc_cfact = None +    get_smartos_environ = None +      def setUp(self):          super(TestSmartOSDataSource, self).setUp() -        dsmos = 'cloudinit.sources.DataSourceSmartOS' -        patcher = mock.patch(dsmos + ".jmc_client_factory") -        self.jmc_cfact = patcher.start() -        self.addCleanup(patcher.stop) -        patcher = mock.patch(dsmos + ".get_smartos_environ") -        self.get_smartos_environ = patcher.start() -        self.addCleanup(patcher.stop) - -        self.tmp = tempfile.mkdtemp() -        self.addCleanup(shutil.rmtree, self.tmp) -        self.paths = c_helpers.Paths( -            {'cloud_dir': self.tmp, 'run_dir': self.tmp}) - -        self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') +        self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ") +        self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact") +        self.legacy_user_d = self.tmp_path('legacy_user_tmp')          os.mkdir(self.legacy_user_d) - -        self.orig_lud = DataSourceSmartOS.LEGACY_USER_D -        DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d - -    def tearDown(self): -        DataSourceSmartOS.LEGACY_USER_D = self.orig_lud -        super(TestSmartOSDataSource, self).tearDown() +        self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d", +                       autospec=False, new=self.legacy_user_d) +        self.add_patch(DSMOS + ".identify_file", "m_identify_file", +                       return_value="text/plain")      def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,                  sys_cfg=None, ds_cfg=None):          self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)          self.get_smartos_environ.return_value = mode +        tmpd = self.tmp_dir() +        dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd), +                'run_dir': self.tmp_path('run_dir')} +        for d in dirs.values(): +            os.mkdir(d) +        paths = c_helpers.Paths(dirs) +          if sys_cfg is None:              sys_cfg = {} @@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):              sys_cfg['datasource']['SmartOS'] = ds_cfg          return DataSourceSmartOS.DataSourceSmartOS( -            sys_cfg, distro=None, paths=self.paths) +            sys_cfg, distro=None, paths=paths)      def test_no_base64(self):          ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):                           dsrc.metadata['user-script'])          legacy_script_f = "%s/user-script" % self.legacy_user_d +        print("legacy_script_f=%s" % legacy_script_f)          self.assertTrue(os.path.exists(legacy_script_f))          self.assertTrue(os.path.islink(legacy_script_f))          user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] @@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):                           mydscfg['disk_aliases']['FOO']) +class TestIdentifyFile(CiTestCase): +    """Test the 'identify_file' utility.""" +    @skipIf(not which("file"), "command 'file' not available.") +    def test_file_happy_path(self): +        """Test file is available and functional on plain text.""" +        fname = self.tmp_path("myfile") +        write_file(fname, "plain text content here\n") +        with self.allow_subp(["file"]): +            self.assertEqual("text/plain", identify_file(fname)) + +    @mock.patch(DSMOS + ".util.subp") +    def test_returns_none_on_error(self, m_subp): +        """On 'file' execution error, None should be returned.""" +        m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99) +        fname = self.tmp_path("myfile") +        write_file(fname, "plain text content here\n") +        self.assertEqual(None, identify_file(fname)) +        self.assertEqual( +            [mock.call(["file", "--brief", "--mime-type", fname])], +            m_subp.call_args_list) + +  class ShortReader(object):      """Implements a 'read' interface for bytes provided.      much like io.BytesIO but the 'endbyte' acts as if EOF. @@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):          self.assertEqual(client.list(), []) -class TestNetworkConversion(TestCase): +class TestNetworkConversion(CiTestCase):      def test_convert_simple(self):          expected = {              'version': 1, @@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):                        "Only supported on KVM and bhyve guests under SmartOS")  @unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),                        "Requires write access to " + SERIAL_DEVICE) -class TestSerialConcurrency(TestCase): +@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available") +class TestSerialConcurrency(CiTestCase):      """         This class tests locking on an actual serial port, and as such can only         be run in a kvm or bhyve guest running on a SmartOS host.  A test run on @@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):         there is only one session over a connection.  In contrast, in the         absence of proper locking multiple processes opening the same serial         port can corrupt each others' exchanges with the metadata server. + +       This takes on the order of 2 to 3 minutes to run.      """ +    allowed_subp = ['mdata-get'] +      def setUp(self):          self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)          self.mdata_proc.start() @@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):          keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]          keys.extend(ds.SMARTOS_ATTRIB_JSON.values()) -        client = ds.jmc_client_factory() +        client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)          self.assertIsNotNone(client)          # The behavior that we are testing for was observed mdata-get running | 
