diff options
-rw-r--r-- | cloudinit/sources/DataSourceSmartOS.py | 117 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 103 |
2 files changed, 204 insertions, 16 deletions
diff --git a/cloudinit/sources/DataSourceSmartOS.py b/cloudinit/sources/DataSourceSmartOS.py index 5dd8a125..c8998b40 100644 --- a/cloudinit/sources/DataSourceSmartOS.py +++ b/cloudinit/sources/DataSourceSmartOS.py @@ -1,4 +1,5 @@ # Copyright (C) 2013 Canonical Ltd. +# Copyright (c) 2018, Joyent, Inc. # # Author: Ben Howard <ben.howard@canonical.com> # @@ -21,6 +22,7 @@ import base64 import binascii +import errno import json import os import random @@ -229,6 +231,9 @@ class DataSourceSmartOS(sources.DataSource): self.md_client) return False + # Open once for many requests, rather than once for each request + self.md_client.open_transport() + for ci_noun, attribute in SMARTOS_ATTRIB_MAP.items(): smartos_noun, strip = attribute md[ci_noun] = self.md_client.get(smartos_noun, strip=strip) @@ -236,6 +241,8 @@ class DataSourceSmartOS(sources.DataSource): for ci_noun, smartos_noun in SMARTOS_ATTRIB_JSON.items(): md[ci_noun] = self.md_client.get_json(smartos_noun) + self.md_client.close_transport() + # @datadictionary: This key may contain a program that is written # to a file in the filesystem of the guest on each boot and then # executed. It may be of any format that would be considered @@ -316,6 +323,10 @@ class JoyentMetadataFetchException(Exception): pass +class JoyentMetadataTimeoutException(JoyentMetadataFetchException): + pass + + class JoyentMetadataClient(object): """ A client implementing v2 of the Joyent Metadata Protocol Specification. @@ -360,6 +371,47 @@ class JoyentMetadataClient(object): LOG.debug('Value "%s" found.', value) return value + def _readline(self): + """ + Reads a line a byte at a time until \n is encountered. Returns an + ascii string with the trailing newline removed. + + If a timeout (per-byte) is set and it expires, a + JoyentMetadataFetchException will be thrown. + """ + response = [] + + def as_ascii(): + return b''.join(response).decode('ascii') + + msg = "Partial response: '%s'" + while True: + try: + byte = self.fp.read(1) + if len(byte) == 0: + raise JoyentMetadataTimeoutException(msg % as_ascii()) + if byte == b'\n': + return as_ascii() + response.append(byte) + except OSError as exc: + if exc.errno == errno.EAGAIN: + raise JoyentMetadataTimeoutException(msg % as_ascii()) + raise + + def _write(self, msg): + self.fp.write(msg.encode('ascii')) + self.fp.flush() + + def _negotiate(self): + LOG.debug('Negotiating protocol V2') + self._write('NEGOTIATE V2\n') + response = self._readline() + LOG.debug('read "%s"', response) + if response != 'V2_OK': + raise JoyentMetadataFetchException( + 'Invalid response "%s" to "NEGOTIATE V2"' % response) + LOG.debug('Negotiation complete') + def request(self, rtype, param=None): request_id = '{0:08x}'.format(random.randint(0, 0xffffffff)) message_body = ' '.join((request_id, rtype,)) @@ -374,18 +426,11 @@ class JoyentMetadataClient(object): self.open_transport() need_close = True - self.fp.write(msg.encode('ascii')) - self.fp.flush() - - response = bytearray() - response.extend(self.fp.read(1)) - while response[-1:] != b'\n': - response.extend(self.fp.read(1)) - + self._write(msg) + response = self._readline() if need_close: self.close_transport() - response = response.rstrip().decode('ascii') LOG.debug('Read "%s" from metadata transport.', response) if 'SUCCESS' not in response: @@ -450,6 +495,7 @@ class JoyentMetadataSocketClient(JoyentMetadataClient): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(self.socketpath) self.fp = sock.makefile('rwb') + self._negotiate() def exists(self): return os.path.exists(self.socketpath) @@ -459,8 +505,9 @@ class JoyentMetadataSocketClient(JoyentMetadataClient): class JoyentMetadataSerialClient(JoyentMetadataClient): - def __init__(self, device, timeout=10, smartos_type=SMARTOS_ENV_KVM): - super(JoyentMetadataSerialClient, self).__init__(smartos_type) + def __init__(self, device, timeout=10, smartos_type=SMARTOS_ENV_KVM, + fp=None): + super(JoyentMetadataSerialClient, self).__init__(smartos_type, fp) self.device = device self.timeout = timeout @@ -468,10 +515,50 @@ class JoyentMetadataSerialClient(JoyentMetadataClient): return os.path.exists(self.device) def open_transport(self): - ser = serial.Serial(self.device, timeout=self.timeout) - if not ser.isOpen(): - raise SystemError("Unable to open %s" % self.device) - self.fp = ser + if self.fp is None: + ser = serial.Serial(self.device, timeout=self.timeout) + if not ser.isOpen(): + raise SystemError("Unable to open %s" % self.device) + self.fp = ser + self._flush() + self._negotiate() + + def _flush(self): + LOG.debug('Flushing input') + # Read any pending data + timeout = self.fp.timeout + self.fp.timeout = 0.1 + while True: + try: + self._readline() + except JoyentMetadataTimeoutException: + break + LOG.debug('Input empty') + + # Send a newline and expect "invalid command". Keep trying until + # successful. Retry rather frequently so that the "Is the host + # metadata service running" appears on the console soon after someone + # attaches in an effort to debug. + if timeout > 5: + self.fp.timeout = 5 + else: + self.fp.timeout = timeout + while True: + LOG.debug('Writing newline, expecting "invalid command"') + self._write('\n') + try: + response = self._readline() + if response == 'invalid command': + break + if response == 'FAILURE': + LOG.debug('Got "FAILURE". Retrying.') + continue + LOG.warning('Unexpected response "%s" during flush', response) + except JoyentMetadataTimeoutException: + LOG.warning('Timeout while initializing metadata client. ' + + 'Is the host metadata service running?') + LOG.debug('Got "invalid command". Flush complete.') + self.fp.timeout = timeout def __repr__(self): return "%s(device=%s, timeout=%s)" % ( diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 88bae5f9..2bea7a17 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -1,4 +1,5 @@ # Copyright (C) 2013 Canonical Ltd. +# Copyright (c) 2018, Joyent, Inc. # # Author: Ben Howard <ben.howard@canonical.com> # @@ -324,6 +325,7 @@ class PsuedoJoyentClient(object): if data is None: data = MOCK_RETURNS.copy() self.data = data + self._is_open = False return def get(self, key, default=None, strip=False): @@ -344,6 +346,14 @@ class PsuedoJoyentClient(object): def exists(self): return True + def open_transport(self): + assert(not self._is_open) + self._is_open = True + + def close_transport(self): + assert(self._is_open) + self._is_open = False + class TestSmartOSDataSource(FilesystemMockingTestCase): def setUp(self): @@ -592,8 +602,46 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) +class ShortReader(object): + """Implements a 'read' interface for bytes provided. + much like io.BytesIO but the 'endbyte' acts as if EOF. + When it is reached a short will be returned.""" + def __init__(self, initial_bytes, endbyte=b'\0'): + self.data = initial_bytes + self.index = 0 + self.len = len(self.data) + self.endbyte = endbyte + + @property + def emptied(self): + return self.index >= self.len + + def read(self, size=-1): + """Read size bytes but not past a null.""" + if size == 0 or self.index >= self.len: + return b'' + + rsize = size + if size < 0 or size + self.index > self.len: + rsize = self.len - self.index + + next_null = self.data.find(self.endbyte, self.index, rsize) + if next_null >= 0: + rsize = next_null - self.index + 1 + i = self.index + self.index += rsize + ret = self.data[i:i + rsize] + if len(ret) and ret[-1:] == self.endbyte: + ret = ret[:-1] + return ret + + class TestJoyentMetadataClient(FilesystemMockingTestCase): + invalid = b'invalid command\n' + failure = b'FAILURE\n' + v2_ok = b'V2_OK\n' + def setUp(self): super(TestJoyentMetadataClient, self).setUp() @@ -636,6 +684,11 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): return DataSourceSmartOS.JoyentMetadataClient( fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) + def _get_serial_client(self): + self.serial.timeout = 1 + return DataSourceSmartOS.JoyentMetadataSerialClient(None, + fp=self.serial) + def assertEndsWith(self, haystack, prefix): self.assertTrue(haystack.endswith(prefix), "{0} does not end with '{1}'".format( @@ -646,12 +699,14 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): "{0} does not start with '{1}'".format( repr(haystack), prefix)) + def assertNoMoreSideEffects(self, obj): + self.assertRaises(StopIteration, obj) + def test_get_metadata_writes_a_single_line(self): client = self._get_client() client.get('some_key') self.assertEqual(1, self.serial.write.call_count) written_line = self.serial.write.call_args[0][0] - print(type(written_line)) self.assertEndsWith(written_line.decode('ascii'), b'\n'.decode('ascii')) self.assertEqual(1, written_line.count(b'\n')) @@ -737,6 +792,52 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): client._checksum = lambda _: self.response_parts['crc'] self.assertIsNone(client.get('some_key')) + def test_negotiate(self): + client = self._get_client() + reader = ShortReader(self.v2_ok) + client.fp.read.side_effect = reader.read + client._negotiate() + self.assertTrue(reader.emptied) + + def test_negotiate_short_response(self): + client = self._get_client() + # chopped '\n' from v2_ok. + reader = ShortReader(self.v2_ok[:-1] + b'\0') + client.fp.read.side_effect = reader.read + self.assertRaises(DataSourceSmartOS.JoyentMetadataTimeoutException, + client._negotiate) + self.assertTrue(reader.emptied) + + def test_negotiate_bad_response(self): + client = self._get_client() + reader = ShortReader(b'garbage\n' + self.v2_ok) + client.fp.read.side_effect = reader.read + self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, + client._negotiate) + self.assertEqual(self.v2_ok, client.fp.read()) + + def test_serial_open_transport(self): + client = self._get_serial_client() + reader = ShortReader(b'garbage\0' + self.invalid + self.v2_ok) + client.fp.read.side_effect = reader.read + client.open_transport() + self.assertTrue(reader.emptied) + + def test_flush_failure(self): + client = self._get_serial_client() + reader = ShortReader(b'garbage' + b'\0' + self.failure + + self.invalid + self.v2_ok) + client.fp.read.side_effect = reader.read + client.open_transport() + self.assertTrue(reader.emptied) + + def test_flush_many_timeouts(self): + client = self._get_serial_client() + reader = ShortReader(b'\0' * 100 + self.invalid + self.v2_ok) + client.fp.read.side_effect = reader.read + client.open_transport() + self.assertTrue(reader.emptied) + class TestNetworkConversion(TestCase): def test_convert_simple(self): |