diff options
| author | Scott Moser <smoser@ubuntu.com> | 2016-04-04 12:07:19 -0400 | 
|---|---|---|
| committer | Scott Moser <smoser@ubuntu.com> | 2016-04-04 12:07:19 -0400 | 
| commit | 7d8a3194552387fa9e21216bcd9a3bfc76fa2b04 (patch) | |
| tree | c8dc45b013208a4e5e09e6ade63b3b5994f80aa3 /tests/unittests/test_datasource/test_smartos.py | |
| parent | 93f5af9f5075a416c65c1d0350c374e16f32f0d5 (diff) | |
| parent | 210b041b2fead7a57af91f60a6f89d9e5aa1ed4a (diff) | |
| download | vyos-cloud-init-7d8a3194552387fa9e21216bcd9a3bfc76fa2b04.tar.gz vyos-cloud-init-7d8a3194552387fa9e21216bcd9a3bfc76fa2b04.zip | |
merge with trunk
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
| -rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 284 | 
1 files changed, 210 insertions, 74 deletions
| diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 65675106..5c49966a 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -22,15 +22,30 @@  #   return responses.  # -import base64 -from cloudinit import helpers as c_helpers -from cloudinit.sources import DataSourceSmartOS -from .. import helpers +from __future__ import print_function +  import os  import os.path  import re +import shutil  import stat +import tempfile  import uuid +from binascii import crc32 + +import serial +import six + +from cloudinit import helpers as c_helpers +from cloudinit.sources import DataSourceSmartOS +from cloudinit.util import b64e + +from .. import helpers + +try: +    from unittest import mock +except ImportError: +    import mock  MOCK_RETURNS = {      'hostname': 'test-host', @@ -41,77 +56,34 @@ MOCK_RETURNS = {      'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),      'sdc:datacenter_name': 'somewhere2',      'sdc:operator-script': '\n'.join(['bin/true', '']), +    'sdc:uuid': str(uuid.uuid4()),      'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),      'user-data': '\n'.join(['something', '']),      'user-script': '\n'.join(['/bin/true', '']),  } -DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') - - -class MockSerial(object): -    """Fake a serial terminal for testing the code that -        interfaces with the serial""" - -    port = None - -    def __init__(self, mockdata): -        self.last = None -        self.last = None -        self.new = True -        self.count = 0 -        self.mocked_out = [] -        self.mockdata = mockdata - -    def open(self): -        return True - -    def close(self): -        return True - -    def isOpen(self): -        return True +DMI_DATA_RETURN = 'smartdc' -    def write(self, line): -        line = line.replace('GET ', '') -        self.last = line.rstrip() -    def readline(self): -        if self.new: -            self.new = False -            if self.last in self.mockdata: -                return 'SUCCESS\n' -            else: -                return 'NOTFOUND %s\n' % self.last - -        if self.last in self.mockdata: -            if not self.mocked_out: -                self.mocked_out = [x for x in self._format_out()] - -            if len(self.mocked_out) > self.count: -                self.count += 1 -                return self.mocked_out[self.count - 1] +def get_mock_client(mockdata): +    class MockMetadataClient(object): -    def _format_out(self): -        if self.last in self.mockdata: -            _mret = self.mockdata[self.last] -            try: -                for l in _mret.splitlines(): -                    yield "%s\n" % l.rstrip() -            except: -                yield "%s\n" % _mret.rstrip() +        def __init__(self, serial): +            pass -            yield '.' -            yield '\n' +        def get_metadata(self, metadata_key): +            return mockdata.get(metadata_key) +    return MockMetadataClient  class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):      def setUp(self): -        helpers.FilesystemMockingTestCase.setUp(self) +        super(TestSmartOSDataSource, self).setUp() -        # makeDir comes from MockerTestCase -        self.tmp = self.makeDir() -        self.legacy_user_d = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp) +        self.legacy_user_d = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.legacy_user_d)          # If you should want to watch the logs...          self._log = None @@ -140,7 +112,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          ret = apply_patches(patches)          self.unapply += ret -    def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None): +    def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None, +                is_lxbrand=False):          mod = DataSourceSmartOS          if mockdata is None: @@ -149,16 +122,17 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          if dmi_data is None:              dmi_data = DMI_DATA_RETURN -        def _get_serial(*_): -            return MockSerial(mockdata) -          def _dmi_data():              return dmi_data          def _os_uname(): -            # LP: #1243287. tests assume this runs, but running test on -            # arm would cause them all to fail. -            return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') +            if not is_lxbrand: +                # LP: #1243287. tests assume this runs, but running test on +                # arm would cause them all to fail. +                return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') +            else: +                return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX', +                        'X86_64')          if sys_cfg is None:              sys_cfg = {} @@ -168,12 +142,14 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):              sys_cfg['datasource']['SmartOS'] = ds_cfg          self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) -        self.apply_patches([(mod, 'get_serial', _get_serial)]) +        self.apply_patches([ +            (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])          self.apply_patches([(mod, 'dmi_data', _dmi_data)])          self.apply_patches([(os, 'uname', _os_uname)])          self.apply_patches([(mod, 'device_exists', lambda d: True)])          dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,                                       paths=self.paths) +        self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])          return dsrc      def test_seed(self): @@ -181,14 +157,29 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          dsrc = self._get_ds()          ret = dsrc.get_data()          self.assertTrue(ret) +        self.assertEquals('kvm', dsrc.smartos_type)          self.assertEquals('/dev/ttyS1', dsrc.seed) +    def test_seed_lxbrand(self): +        # default seed should be /dev/ttyS1 +        dsrc = self._get_ds(is_lxbrand=True) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertEquals('lx-brand', dsrc.smartos_type) +        self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed) +      def test_issmartdc(self):          dsrc = self._get_ds()          ret = dsrc.get_data()          self.assertTrue(ret)          self.assertTrue(dsrc.is_smartdc) +    def test_issmartdc_lxbrand(self): +        dsrc = self._get_ds(is_lxbrand=True) +        ret = dsrc.get_data() +        self.assertTrue(ret) +        self.assertTrue(dsrc.is_smartdc) +      def test_no_base64(self):          ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}          dsrc = self._get_ds(ds_cfg=ds_cfg) @@ -199,7 +190,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          dsrc = self._get_ds(mockdata=MOCK_RETURNS)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id']) +        self.assertEquals(MOCK_RETURNS['sdc:uuid'], +                          dsrc.metadata['instance-id'])      def test_root_keys(self):          dsrc = self._get_ds(mockdata=MOCK_RETURNS) @@ -227,7 +219,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_all'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -248,7 +240,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns['b64-cloud-init:user-data'] = "true"          my_returns['b64-hostname'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -264,7 +256,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_keys'] = 'hostname,ignored'          for k in ('hostname',): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -365,7 +357,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):                  permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]                  if re.match(r'.*\/mdata-user-data$', name_f):                      found_new = True -                    print name_f +                    print(name_f)                      self.assertEquals(permissions, '400')          self.assertFalse(found_new) @@ -447,3 +439,147 @@ def apply_patches(patches):          setattr(ref, name, replace)          ret.append((ref, name, orig))      return ret + + +class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): + +    def setUp(self): +        super(TestJoyentMetadataClient, self).setUp() +        self.serial = mock.MagicMock(spec=serial.Serial) +        self.request_id = 0xabcdef12 +        self.metadata_value = 'value' +        self.response_parts = { +            'command': 'SUCCESS', +            'crc': 'b5a9ff00', +            'length': 17 + len(b64e(self.metadata_value)), +            'payload': b64e(self.metadata_value), +            'request_id': '{0:08x}'.format(self.request_id), +        } + +        def make_response(): +            payloadstr = '' +            if 'payload' in self.response_parts: +                payloadstr = ' {0}'.format(self.response_parts['payload']) +            return ('V2 {length} {crc} {request_id} ' +                    '{command}{payloadstr}\n'.format( +                        payloadstr=payloadstr, +                        **self.response_parts).encode('ascii')) + +        self.metasource_data = None + +        def read_response(length): +            if not self.metasource_data: +                self.metasource_data = make_response() +                self.metasource_data_len = len(self.metasource_data) +            resp = self.metasource_data[:length] +            self.metasource_data = self.metasource_data[length:] +            return resp + +        self.serial.read.side_effect = read_response +        self.patched_funcs.enter_context( +            mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint', +                       mock.Mock(return_value=self.request_id))) + +    def _get_client(self): +        return DataSourceSmartOS.JoyentMetadataClient(self.serial) + +    def assertEndsWith(self, haystack, prefix): +        self.assertTrue(haystack.endswith(prefix), +                        "{0} does not end with '{1}'".format( +                            repr(haystack), prefix)) + +    def assertStartsWith(self, haystack, prefix): +        self.assertTrue(haystack.startswith(prefix), +                        "{0} does not start with '{1}'".format( +                            repr(haystack), prefix)) + +    def test_get_metadata_writes_a_single_line(self): +        client = self._get_client() +        client.get_metadata('some_key') +        self.assertEqual(1, self.serial.write.call_count) +        written_line = self.serial.write.call_args[0][0] +        print(type(written_line)) +        self.assertEndsWith(written_line.decode('ascii'), +                            b'\n'.decode('ascii')) +        self.assertEqual(1, written_line.count(b'\n')) + +    def _get_written_line(self, key='some_key'): +        client = self._get_client() +        client.get_metadata(key) +        return self.serial.write.call_args[0][0] + +    def test_get_metadata_writes_bytes(self): +        self.assertIsInstance(self._get_written_line(), six.binary_type) + +    def test_get_metadata_line_starts_with_v2(self): +        foo = self._get_written_line() +        self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii')) + +    def test_get_metadata_uses_get_command(self): +        parts = self._get_written_line().decode('ascii').strip().split(' ') +        self.assertEqual('GET', parts[4]) + +    def test_get_metadata_base64_encodes_argument(self): +        key = 'my_key' +        parts = self._get_written_line(key).decode('ascii').strip().split(' ') +        self.assertEqual(b64e(key), parts[5]) + +    def test_get_metadata_calculates_length_correctly(self): +        parts = self._get_written_line().decode('ascii').strip().split(' ') +        expected_length = len(' '.join(parts[3:])) +        self.assertEqual(expected_length, int(parts[1])) + +    def test_get_metadata_uses_appropriate_request_id(self): +        parts = self._get_written_line().decode('ascii').strip().split(' ') +        request_id = parts[3] +        self.assertEqual(8, len(request_id)) +        self.assertEqual(request_id, request_id.lower()) + +    def test_get_metadata_uses_random_number_for_request_id(self): +        line = self._get_written_line() +        request_id = line.decode('ascii').strip().split(' ')[3] +        self.assertEqual('{0:08x}'.format(self.request_id), request_id) + +    def test_get_metadata_checksums_correctly(self): +        parts = self._get_written_line().decode('ascii').strip().split(' ') +        expected_checksum = '{0:08x}'.format( +            crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff) +        checksum = parts[2] +        self.assertEqual(expected_checksum, checksum) + +    def test_get_metadata_reads_a_line(self): +        client = self._get_client() +        client.get_metadata('some_key') +        self.assertEqual(self.metasource_data_len, self.serial.read.call_count) + +    def test_get_metadata_returns_valid_value(self): +        client = self._get_client() +        value = client.get_metadata('some_key') +        self.assertEqual(self.metadata_value, value) + +    def test_get_metadata_throws_exception_for_incorrect_length(self): +        self.response_parts['length'] = 0 +        client = self._get_client() +        self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, +                          client.get_metadata, 'some_key') + +    def test_get_metadata_throws_exception_for_incorrect_crc(self): +        self.response_parts['crc'] = 'deadbeef' +        client = self._get_client() +        self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, +                          client.get_metadata, 'some_key') + +    def test_get_metadata_throws_exception_for_request_id_mismatch(self): +        self.response_parts['request_id'] = 'deadbeef' +        client = self._get_client() +        client._checksum = lambda _: self.response_parts['crc'] +        self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, +                          client.get_metadata, 'some_key') + +    def test_get_metadata_returns_None_if_value_not_found(self): +        self.response_parts['payload'] = '' +        self.response_parts['command'] = 'NOTFOUND' +        self.response_parts['length'] = 17 +        client = self._get_client() +        client._checksum = lambda _: self.response_parts['crc'] +        self.assertIsNone(client.get_metadata('some_key')) | 
