diff options
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 88 |
1 files changed, 13 insertions, 75 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index c79cf3aa..39991cc2 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -40,7 +40,7 @@ import six from cloudinit import helpers as c_helpers from cloudinit.sources import DataSourceSmartOS -from cloudinit.util import b64d, b64e +from cloudinit.util import b64e from .. import helpers @@ -66,76 +66,15 @@ MOCK_RETURNS = { DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') -def _checksum(body): - return '{0:08x}'.format(crc32(body.encode('utf-8')) & 0xffffffff) - - -def _generate_v2_frame(request_id, command, body=None): - body_parts = [request_id, command] - if body: - body_parts.append(b64e(body)) - message_body = ' '.join(body_parts) - return 'V2 {0} {1} {2}\n'.format( - len(message_body), _checksum(message_body), message_body).encode( - 'ascii') - - -def _parse_v2_frame(line): - line = line.decode('ascii') - if not line.endswith('\n'): - raise Exception('Frames must end with a newline.') - version, length, checksum, body = line.strip().split(' ', 3) - if version != 'V2': - raise Exception('Frames must begin with V2.') - if int(length) != len(body): - raise Exception('Incorrect frame length given ({0} != {1}).'.format( - length, len(body))) - expected_checksum = _checksum(body) - if checksum != expected_checksum: - raise Exception('Invalid checksum.') - request_id, command, payload = body.split() - return request_id, command, b64d(payload) - - -class MockSerial(object): - """Fake a serial terminal for testing the code that - interfaces with the serial""" - - port = None - - def __init__(self, mockdata): - self.last = None - self.last = None - self.new = True - self.count = 0 - self.mocked_out = [] - self.mockdata = mockdata - - def open(self): - return True - - def close(self): - return True - - def isOpen(self): - return True - - def write(self, line): - self.last = line - - def readline(self): - if self.last == '\n': - return 'invalid command\n' - elif self.last == 'NEGOTIATE V2\n': - return 'V2_OK\n' - request_id, command, request_body = _parse_v2_frame(self.last) - if command != 'GET': - raise Exception('MockSerial only supports GET requests.') - metadata_key = request_body.strip() - if metadata_key in self.mockdata: - return _generate_v2_frame( - request_id, 'SUCCESS', self.mockdata[metadata_key]) - return _generate_v2_frame(request_id, 'NOTFOUND') +def get_mock_client(mockdata): + class MockMetadataClient(object): + + def __init__(self, serial): + pass + + def get_metadata(self, metadata_key): + return mockdata.get(metadata_key) + return MockMetadataClient class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): @@ -183,9 +122,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): if dmi_data is None: dmi_data = DMI_DATA_RETURN - def _get_serial(*_): - return MockSerial(mockdata) - def _dmi_data(): return dmi_data @@ -202,7 +138,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): sys_cfg['datasource']['SmartOS'] = ds_cfg self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) - self.apply_patches([(mod, 'get_serial', _get_serial)]) + self.apply_patches([(mod, 'get_serial', mock.MagicMock())]) + self.apply_patches([ + (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) self.apply_patches([(mod, 'dmi_data', _dmi_data)]) self.apply_patches([(os, 'uname', _os_uname)]) self.apply_patches([(mod, 'device_exists', lambda d: True)]) |