summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_smartos.py88
1 files changed, 13 insertions, 75 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index c79cf3aa..39991cc2 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -40,7 +40,7 @@ import six
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
-from cloudinit.util import b64d, b64e
+from cloudinit.util import b64e
from .. import helpers
@@ -66,76 +66,15 @@ MOCK_RETURNS = {
DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
-def _checksum(body):
- return '{0:08x}'.format(crc32(body.encode('utf-8')) & 0xffffffff)
-
-
-def _generate_v2_frame(request_id, command, body=None):
- body_parts = [request_id, command]
- if body:
- body_parts.append(b64e(body))
- message_body = ' '.join(body_parts)
- return 'V2 {0} {1} {2}\n'.format(
- len(message_body), _checksum(message_body), message_body).encode(
- 'ascii')
-
-
-def _parse_v2_frame(line):
- line = line.decode('ascii')
- if not line.endswith('\n'):
- raise Exception('Frames must end with a newline.')
- version, length, checksum, body = line.strip().split(' ', 3)
- if version != 'V2':
- raise Exception('Frames must begin with V2.')
- if int(length) != len(body):
- raise Exception('Incorrect frame length given ({0} != {1}).'.format(
- length, len(body)))
- expected_checksum = _checksum(body)
- if checksum != expected_checksum:
- raise Exception('Invalid checksum.')
- request_id, command, payload = body.split()
- return request_id, command, b64d(payload)
-
-
-class MockSerial(object):
- """Fake a serial terminal for testing the code that
- interfaces with the serial"""
-
- port = None
-
- def __init__(self, mockdata):
- self.last = None
- self.last = None
- self.new = True
- self.count = 0
- self.mocked_out = []
- self.mockdata = mockdata
-
- def open(self):
- return True
-
- def close(self):
- return True
-
- def isOpen(self):
- return True
-
- def write(self, line):
- self.last = line
-
- def readline(self):
- if self.last == '\n':
- return 'invalid command\n'
- elif self.last == 'NEGOTIATE V2\n':
- return 'V2_OK\n'
- request_id, command, request_body = _parse_v2_frame(self.last)
- if command != 'GET':
- raise Exception('MockSerial only supports GET requests.')
- metadata_key = request_body.strip()
- if metadata_key in self.mockdata:
- return _generate_v2_frame(
- request_id, 'SUCCESS', self.mockdata[metadata_key])
- return _generate_v2_frame(request_id, 'NOTFOUND')
+def get_mock_client(mockdata):
+ class MockMetadataClient(object):
+
+ def __init__(self, serial):
+ pass
+
+ def get_metadata(self, metadata_key):
+ return mockdata.get(metadata_key)
+ return MockMetadataClient
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
@@ -183,9 +122,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if dmi_data is None:
dmi_data = DMI_DATA_RETURN
- def _get_serial(*_):
- return MockSerial(mockdata)
-
def _dmi_data():
return dmi_data
@@ -202,7 +138,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'get_serial', mock.MagicMock())])
+ self.apply_patches([
+ (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
self.apply_patches([(mod, 'device_exists', lambda d: True)])