diff options
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
| -rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 312 | 
1 files changed, 291 insertions, 21 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index ea20777a..946286bd 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -40,13 +40,9 @@ import six  from cloudinit import helpers as c_helpers  from cloudinit.sources import DataSourceSmartOS  from cloudinit.util import b64e +from cloudinit import util -from .. import helpers - -try: -    from unittest import mock -except ImportError: -    import mock +from ..helpers import mock, TestCase, FilesystemMockingTestCase  SDC_NICS = json.loads("""  [ @@ -103,7 +99,7 @@ MOCK_RETURNS = {      'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),      'user-data': '\n'.join(['something', '']),      'user-script': '\n'.join(['/bin/true', '']), -    'sdc:nics': SDC_NICS, +    'sdc:nics': json.dumps(SDC_NICS),  }  DMI_DATA_RETURN = 'smartdc' @@ -115,12 +111,286 @@ def get_mock_client(mockdata):          def __init__(self, serial):              pass -        def get_metadata(self, metadata_key): +        def get(self, metadata_key):              return mockdata.get(metadata_key)      return MockMetadataClient -class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): +class PsuedoJoyentClient(object): +    def __init__(self, data=None): +        if data is None: +            data = MOCK_RETURNS.copy() +        self.data = data +        return +         +    def get(self, key, default=None, strip=False): +        if key in self.data: +            r = self.data[key] +            if strip: +                r = r.strip() +        else: +            r = default +        return r + +    def get_json(self, key, default=None): +        result = self.get(key, default=default) +        if result is None: +            return default +        return json.loads(result) + +    def exists(self): +        return True + + +class TestSmartOSDataSource(FilesystemMockingTestCase): +    def setUp(self): +        super(TestSmartOSDataSource, self).setUp() + +        dsmos = 'cloudinit.sources.DataSourceSmartOS' +        patcher = mock.patch(dsmos + ".jmc_client_factory") +        self.jmc_cfact = patcher.start() +        self.addCleanup(patcher.stop) +        patcher = mock.patch(dsmos + ".get_smartos_environ") +        self.get_smartos_environ = patcher.start() +        self.addCleanup(patcher.stop) + +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp) +        self.paths = c_helpers.Paths({'cloud_dir': self.tmp}) + +        self.legacy_user_d = tempfile.mkdtemp() +        self.orig_lud = DataSourceSmartOS.LEGACY_USER_D +        DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d + +    def tearDown(self): +        DataSourceSmartOS.LEGACY_USER_D = self.orig_lud +        super(TestSmartOSDataSource, self).tearDown() + +    def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, +                sys_cfg=None, ds_cfg=None): +        self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) +        self.get_smartos_environ.return_value = mode + +        if sys_cfg is None: +            sys_cfg = {} + +        if ds_cfg is not None: +            sys_cfg['datasource'] = sys_cfg.get('datasource', {}) +            sys_cfg['datasource']['SmartOS'] = ds_cfg + +        return DataSourceSmartOS.DataSourceSmartOS( +            sys_cfg, distro=None, paths=self.paths) +         +    def test_it_got_here(self): +        dsrc = self._get_ds() +        ret = dsrc.get_data() + +    def test_no_base64(self): +        ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} +        dsrc = self._get_ds(ds_cfg=ds_cfg) +        ret = dsrc.get_data() +        self.assertTrue(ret) + +    def test_uuid(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['sdc:uuid'], +                         dsrc.metadata['instance-id']) + +    def test_root_keys(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['root_authorized_keys'], +                         dsrc.metadata['public-keys']) + +    def test_hostname_b64(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['hostname'], +                         dsrc.metadata['local-hostname']) + +    def test_hostname(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['hostname'], +                         dsrc.metadata['local-hostname']) + +    def test_userdata(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['user-data'], +                         dsrc.metadata['legacy-user-data']) +        self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], +                         dsrc.userdata_raw) + +    def test_sdc_nics(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEquals(json.loads(MOCK_RETURNS['sdc:nics']), +                          dsrc.metadata['network-data']) + +    def test_sdc_scripts(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['user-script'], +                         dsrc.metadata['user-script']) + +        legacy_script_f = "%s/user-script" % self.legacy_user_d +        self.assertTrue(os.path.exists(legacy_script_f)) +        self.assertTrue(os.path.islink(legacy_script_f)) +        user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] +        self.assertEqual(user_script_perm, '700') + +    def test_scripts_shebanged(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['user-script'], +                         dsrc.metadata['user-script']) + +        legacy_script_f = "%s/user-script" % self.legacy_user_d +        self.assertTrue(os.path.exists(legacy_script_f)) +        self.assertTrue(os.path.islink(legacy_script_f)) +        shebang = None +        with open(legacy_script_f, 'r') as f: +            shebang = f.readlines()[0].strip() +        self.assertEqual(shebang, "#!/bin/bash") +        user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] +        self.assertEqual(user_script_perm, '700') + +    def test_scripts_shebang_not_added(self): +        """ +            Test that the SmartOS requirement that plain text scripts +            are executable. This test makes sure that plain texts scripts +            with out file magic have it added appropriately by cloud-init. +        """ + +        my_returns = MOCK_RETURNS.copy() +        my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl', +                                               'print("hi")', '']) + +        dsrc = self._get_ds(mockdata=my_returns) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(my_returns['user-script'], +                         dsrc.metadata['user-script']) + +        legacy_script_f = "%s/user-script" % self.legacy_user_d +        self.assertTrue(os.path.exists(legacy_script_f)) +        self.assertTrue(os.path.islink(legacy_script_f)) +        shebang = None +        with open(legacy_script_f, 'r') as f: +            shebang = f.readlines()[0].strip() +        self.assertEqual(shebang, "#!/usr/bin/perl") + +    def test_userdata_removed(self): +        """ +            User-data in the SmartOS world is supposed to be written to a file +            each and every boot. This tests to make sure that in the event the +            legacy user-data is removed, the existing user-data is backed-up +            and there is no /var/db/user-data left. +        """ + +        user_data_f = "%s/mdata-user-data" % self.legacy_user_d +        with open(user_data_f, 'w') as f: +            f.write("PREVIOUS") + +        my_returns = MOCK_RETURNS.copy() +        del my_returns['user-data'] + +        dsrc = self._get_ds(mockdata=my_returns) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertFalse(dsrc.metadata.get('legacy-user-data')) + +        found_new = False +        for root, _dirs, files in os.walk(self.legacy_user_d): +            for name in files: +                name_f = os.path.join(root, name) +                permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] +                if re.match(r'.*\/mdata-user-data$', name_f): +                    found_new = True +                    print(name_f) +                    self.assertEqual(permissions, '400') + +        self.assertFalse(found_new) + +    def test_vendor_data_not_default(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['sdc:vendor-data'], +                         dsrc.metadata['vendor-data']) + +    def test_default_vendor_data(self): +        my_returns = MOCK_RETURNS.copy() +        def_op_script = my_returns['sdc:vendor-data'] +        del my_returns['sdc:vendor-data'] +        dsrc = self._get_ds(mockdata=my_returns) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data']) + +        # we expect default vendor-data is a boothook +        self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook")) + +    def test_disable_iptables_flag(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], +                         dsrc.metadata['iptables_disable']) + +    def test_motd_sys_info(self): +        dsrc = self._get_ds(mockdata=MOCK_RETURNS) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], +                         dsrc.metadata['motd_sys_info']) + +    def test_default_ephemeral(self): +        # Test to make sure that the builtin config has the ephemeral +        # configuration. +        dsrc = self._get_ds() +        cfg = dsrc.get_config_obj() + +        ret = dsrc.get_data() +        self.assertTrue(ret) + +        assert 'disk_setup' in cfg +        assert 'fs_setup' in cfg +        self.assertIsInstance(cfg['disk_setup'], dict) +        self.assertIsInstance(cfg['fs_setup'], list) + +    def test_override_disk_aliases(self): +        # Test to make sure that the built-in DS is overriden +        builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG + +        mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}} + +        # expect that these values are in builtin, or this is pointless +        for k in mydscfg: +            self.assertIn(k, builtin) + +        dsrc = self._get_ds(ds_cfg=mydscfg) +        ret = dsrc.get_data() +        self.assertTrue(ret) + +        self.assertEqual(mydscfg['disk_aliases']['FOO'], +                         dsrc.ds_cfg['disk_aliases']['FOO']) + +        self.assertEqual(dsrc.device_name_to_device('FOO'), +                         mydscfg['disk_aliases']['FOO']) + + +class OldTestSmartOSDataSource(FilesystemMockingTestCase):      def setUp(self):          super(TestSmartOSDataSource, self).setUp() @@ -141,7 +411,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          super(TestSmartOSDataSource, self).setUp()      def tearDown(self): -        helpers.FilesystemMockingTestCase.tearDown(self) +        FilesystemMockingTestCase.tearDown(self)          if self._log_handler and self._log:              self._log.removeHandler(self._log_handler)          apply_patches([i for i in reversed(self.unapply)]) @@ -166,7 +436,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          if dmi_data is None:              dmi_data = DMI_DATA_RETURN -        def _dmi_data(): +        def _dmi_data(item):              return dmi_data          def _os_uname(): @@ -188,12 +458,12 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])          self.apply_patches([              (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) -        self.apply_patches([(mod, 'dmi_data', _dmi_data)]) +        self.apply_patches([(util, 'read_dmi_data', _dmi_data)])          self.apply_patches([(os, 'uname', _os_uname)]) -        self.apply_patches([(mod, 'device_exists', lambda d: True)]) +        self.apply_patches([(os.path, 'exists', lambda d: True)])          dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,                                       paths=self.paths) -        self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())]) +        #self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])          return dsrc      def test_seed(self): @@ -492,7 +762,7 @@ def apply_patches(patches):      return ret -class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): +class TestJoyentMetadataClient(FilesystemMockingTestCase):      def setUp(self):          super(TestJoyentMetadataClient, self).setUp() @@ -532,7 +802,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):                         mock.Mock(return_value=self.request_id)))      def _get_client(self): -        return DataSourceSmartOS.JoyentMetadataClient(self.serial) +        return DataSourceSmartOS.JoyentMetadataSerialClient(self.serial)      def assertEndsWith(self, haystack, prefix):          self.assertTrue(haystack.endswith(prefix), @@ -546,7 +816,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):      def test_get_metadata_writes_a_single_line(self):          client = self._get_client() -        client.get_metadata('some_key') +        client.get('some_key')          self.assertEqual(1, self.serial.write.call_count)          written_line = self.serial.write.call_args[0][0]          print(type(written_line)) @@ -556,7 +826,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):      def _get_written_line(self, key='some_key'):          client = self._get_client() -        client.get_metadata(key) +        client.get(key)          return self.serial.write.call_args[0][0]      def test_get_metadata_writes_bytes(self): @@ -600,12 +870,12 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):      def test_get_metadata_reads_a_line(self):          client = self._get_client() -        client.get_metadata('some_key') +        client.get('some_key')          self.assertEqual(self.metasource_data_len, self.serial.read.call_count)      def test_get_metadata_returns_valid_value(self):          client = self._get_client() -        value = client.get_metadata('some_key') +        value = client.get('some_key')          self.assertEqual(self.metadata_value, value)      def test_get_metadata_throws_exception_for_incorrect_length(self): @@ -633,4 +903,4 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):          self.response_parts['length'] = 17          client = self._get_client()          client._checksum = lambda _: self.response_parts['crc'] -        self.assertIsNone(client.get_metadata('some_key')) +        self.assertIsNone(client.get('some_key'))  | 
