From 337c1a8d8e0ce2cbaac69a4cfe5beffbf52e7acb Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 7 Jun 2018 23:44:00 +0200 Subject: [PATCH 1/4] bpo-33694: Fix race condition on proactor recv() The cancellation of an overlapped WSARecv() has a race condition which causes data loss because of the current implementation of proactor in asyncio. No longer cancel overlapped WSARecv() in _ProactorReadPipeTransport to work around the race condition. Remove the optimized recv_into() implementation to get simple implementation of pause_reading() using the single _pending_data attribute. --- Lib/asyncio/proactor_events.py | 183 +++++++----------- Lib/test/test_asyncio/test_proactor_events.py | 4 + .../2018-06-07-23-51-00.bpo-33694.F1zIR1.rst | 2 + 3 files changed, 73 insertions(+), 116 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 337ed0fb204751..503f46064680d3 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -159,20 +159,14 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): - self._loop_reading_cb = None + self._pending_data = None self._paused = True super().__init__(loop, sock, protocol, waiter, extra, server) - self._reschedule_on_resume = False self._loop.call_soon(self._loop_reading) self._paused = False def set_protocol(self, protocol): - if isinstance(protocol, protocols.BufferedProtocol): - self._loop_reading_cb = self._loop_reading__get_buffer - else: - self._loop_reading_cb = self._loop_reading__data_received - super().set_protocol(protocol) if self.is_reading(): @@ -188,17 +182,16 @@ def pause_reading(self): return self._paused = True - if self._read_fut is not None and not self._read_fut.done(): - # TODO: This is an ugly hack to cancel the current read future - # *and* avoid potential race conditions, as read cancellation - # goes through `future.cancel()` and `loop.call_soon()`. - # We then use this special attribute in the reader callback to - # exit *immediately* without doing any cleanup/rescheduling. - self._read_fut.__asyncio_cancelled_on_pause__ = True - - self._read_fut.cancel() - self._read_fut = None - self._reschedule_on_resume = True + # bpo-33694: Don't cancel self._read_fut because cancelling an + # overlapped WSASend() loss silently data with the current proactor + # implementation. + # + # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend() + # completed (even if HasOverlappedIoCompleted() returns 0), but + # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND + # error. Once the overlapped is ignored, the IOCP loop will ignores the + # completion I/O event and so not read the result of the overlapped + # WSARecv(). if self._loop.get_debug(): logger.debug("%r pauses reading", self) @@ -206,14 +199,22 @@ def pause_reading(self): def resume_reading(self): if self._closing or not self._paused: return + self._paused = False - if self._reschedule_on_resume: + if self._read_fut is None: self._loop.call_soon(self._loop_reading, self._read_fut) - self._reschedule_on_resume = False + + data = self._pending_data + self._pending_data = None + if data is not None: + # Call the protocol methode after calling _loop_reading(), + # since the protocol can decide to pause reading again. + self._loop.call_soon(self._data_received, data) + if self._loop.get_debug(): logger.debug("%r resumes reading", self) - def _loop_reading__on_eof(self): + def _eof_received(self): if self._loop.get_debug(): logger.debug("%r received EOF", self) @@ -227,18 +228,45 @@ def _loop_reading__on_eof(self): if not keep_open: self.close() - def _loop_reading(self, fut=None): - self._loop_reading_cb(fut) - - def _loop_reading__data_received(self, fut): - if (fut is not None and - getattr(fut, '__asyncio_cancelled_on_pause__', False)): + def _data_received(self, data): + if self._paused: + # don't call any protocol method while reading is paused + assert self._pending_data is None + self._pending_data = data return - if self._paused: - self._reschedule_on_resume = True + if not data: + self._eof_received() return + if isinstance(self._protocol, protocols.BufferedProtocol): + nbytes = len(data) + if nbytes: + try: + buf = self._protocol.get_buffer(-1) + if not len(buf): + raise RuntimeError('get_buffer() returned ' + 'an empty buffer') + except Exception as exc: + self._fatal_error( + exc, 'Fatal error: protocol.get_buffer() call failed.') + return + + # Copy data into the buffer + buf[:nbytes] = data + + try: + self._protocol.buffer_updated(nbytes) + except Exception as exc: + self._fatal_error( + exc, + 'Fatal error: ' + 'protocol.buffer_updated() call failed.') + return + else: + self._protocol.data_received(data) + + def _loop_reading(self, fut=None): data = None try: if fut is not None: @@ -261,8 +289,12 @@ def _loop_reading__data_received(self, fut): # we got end-of-file so no need to reschedule a new read return - # reschedule a new read - self._read_fut = self._loop._proactor.recv(self._sock, 32768) + # bpo-33694: buffer_updated() has currently no fast path because + # of a data loss issue with WSASend() cancellation. + + if not self._paused: + # reschedule a new read + self._read_fut = self._loop._proactor.recv(self._sock, 32768) except ConnectionAbortedError as exc: if not self._closing: self._fatal_error(exc, 'Fatal read error on pipe transport') @@ -277,92 +309,11 @@ def _loop_reading__data_received(self, fut): if not self._closing: raise else: - self._read_fut.add_done_callback(self._loop_reading__data_received) + if not self._paused: + self._read_fut.add_done_callback(self._loop_reading) finally: - if data: - self._protocol.data_received(data) - elif data == b'': - self._loop_reading__on_eof() - - def _loop_reading__get_buffer(self, fut): - if (fut is not None and - getattr(fut, '__asyncio_cancelled_on_pause__', False)): - return - - if self._paused: - self._reschedule_on_resume = True - return - - nbytes = None - if fut is not None: - assert self._read_fut is fut or (self._read_fut is None and - self._closing) - self._read_fut = None - try: - if fut.done(): - nbytes = fut.result() - else: - # the future will be replaced by next proactor.recv call - fut.cancel() - except ConnectionAbortedError as exc: - if not self._closing: - self._fatal_error( - exc, 'Fatal read error on pipe transport') - elif self._loop.get_debug(): - logger.debug("Read error on pipe transport while closing", - exc_info=True) - except ConnectionResetError as exc: - self._force_close(exc) - except OSError as exc: - self._fatal_error(exc, 'Fatal read error on pipe transport') - except futures.CancelledError: - if not self._closing: - raise - - if nbytes is not None: - if nbytes == 0: - # we got end-of-file so no need to reschedule a new read - self._loop_reading__on_eof() - else: - try: - self._protocol.buffer_updated(nbytes) - except Exception as exc: - self._fatal_error( - exc, - 'Fatal error: ' - 'protocol.buffer_updated() call failed.') - return - - if self._closing or nbytes == 0: - # since close() has been called we ignore any read data - return - - try: - buf = self._protocol.get_buffer(-1) - if not len(buf): - raise RuntimeError('get_buffer() returned an empty buffer') - except Exception as exc: - self._fatal_error( - exc, 'Fatal error: protocol.get_buffer() call failed.') - return - - try: - # schedule a new read - self._read_fut = self._loop._proactor.recv_into(self._sock, buf) - self._read_fut.add_done_callback(self._loop_reading__get_buffer) - except ConnectionAbortedError as exc: - if not self._closing: - self._fatal_error(exc, 'Fatal read error on pipe transport') - elif self._loop.get_debug(): - logger.debug("Read error on pipe transport while closing", - exc_info=True) - except ConnectionResetError as exc: - self._force_close(exc) - except OSError as exc: - self._fatal_error(exc, 'Fatal read error on pipe transport') - except futures.CancelledError: - if not self._closing: - raise + if data is not None: + self._data_received(data) class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 26588634de04a7..ac529413c2756b 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -459,6 +459,8 @@ def test_dont_pause_writing(self): self.assertFalse(self.protocol.pause_writing.called) +@unittest.skip('FIXME: bpo-33694: these tests are too close ' + 'to the implementation and should be refactored or removed') class ProactorSocketTransportBufferedProtoTests(test_utils.TestCase): def setUp(self): @@ -551,6 +553,8 @@ def test_proto_type_switch(self): self.loop._proactor.recv_into.assert_called_with(self.sock, buf) buf_proto.buffer_updated.assert_called_with(4) + @unittest.skip('FIXME: bpo-33694: this test is too close to the ' + 'implementation and should be refactored or removed') def test_proto_buf_switch(self): tr = self.socket_transport() test_utils.run_briefly(self.loop) diff --git a/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst b/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst new file mode 100644 index 00000000000000..675c7920bf57f7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst @@ -0,0 +1,2 @@ +asyncio: Fix a race condition causing dataloss on +pause_reading()/resume_reading() when using the ProactorEventLoop. From c74aafde3bb5628db75f4a816617e2fed80e77d4 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 8 Jun 2018 00:01:03 +0200 Subject: [PATCH 2/4] Cleanup Remove set_protocol() which became useless --- Lib/asyncio/proactor_events.py | 17 +++++------------ .../2018-06-07-23-51-00.bpo-33694.F1zIR1.rst | 2 +- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 503f46064680d3..f94b2b6aa26e26 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -166,14 +166,6 @@ def __init__(self, loop, sock, protocol, waiter=None, self._loop.call_soon(self._loop_reading) self._paused = False - def set_protocol(self, protocol): - super().set_protocol(protocol) - - if self.is_reading(): - # reset reading callback / buffers / self._read_fut - self.pause_reading() - self.resume_reading() - def is_reading(self): return not self._paused and not self._closing @@ -202,7 +194,7 @@ def resume_reading(self): self._paused = False if self._read_fut is None: - self._loop.call_soon(self._loop_reading, self._read_fut) + self._loop.call_soon(self._loop_reading, None) data = self._pending_data self._pending_data = None @@ -230,7 +222,8 @@ def _eof_received(self): def _data_received(self, data): if self._paused: - # don't call any protocol method while reading is paused + # Don't call any protocol method while reading is paused. + # The protocol will be called on resume_reading(). assert self._pending_data is None self._pending_data = data return @@ -289,8 +282,8 @@ def _loop_reading(self, fut=None): # we got end-of-file so no need to reschedule a new read return - # bpo-33694: buffer_updated() has currently no fast path because - # of a data loss issue with WSASend() cancellation. + # bpo-33694: buffer_updated() has currently no fast path because of + # a data loss issue caused by overlapped WSASend() cancellation. if not self._paused: # reschedule a new read diff --git a/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst b/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst index 675c7920bf57f7..6b7f15cf8d437e 100644 --- a/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst +++ b/Misc/NEWS.d/next/Library/2018-06-07-23-51-00.bpo-33694.F1zIR1.rst @@ -1,2 +1,2 @@ -asyncio: Fix a race condition causing dataloss on +asyncio: Fix a race condition causing data loss on pause_reading()/resume_reading() when using the ProactorEventLoop. From bf7fe3522c4321c6b4bdb8ba66d1249bcdf2976c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 8 Jun 2018 00:08:08 +0200 Subject: [PATCH 3/4] Use sslproto._feed_data_to_bufferred_proto() --- Lib/asyncio/proactor_events.py | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f94b2b6aa26e26..82e9bba5e59858 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -233,28 +233,12 @@ def _data_received(self, data): return if isinstance(self._protocol, protocols.BufferedProtocol): - nbytes = len(data) - if nbytes: - try: - buf = self._protocol.get_buffer(-1) - if not len(buf): - raise RuntimeError('get_buffer() returned ' - 'an empty buffer') - except Exception as exc: - self._fatal_error( - exc, 'Fatal error: protocol.get_buffer() call failed.') - return - - # Copy data into the buffer - buf[:nbytes] = data - try: - self._protocol.buffer_updated(nbytes) + sslproto._feed_data_to_bufferred_proto(self._protocol, data) except Exception as exc: - self._fatal_error( - exc, - 'Fatal error: ' - 'protocol.buffer_updated() call failed.') + self._fatal_error(exc, + 'Fatal error: protocol.buffer_updated() ' + 'call failed.') return else: self._protocol.data_received(data) From c81003b4ecdc7120a34576d30d46611901da282f Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 8 Jun 2018 00:12:04 +0200 Subject: [PATCH 4/4] Move _feed_data_to_bufferred_proto() to protocols --- Lib/asyncio/proactor_events.py | 2 +- Lib/asyncio/protocols.py | 19 +++++++++++++++++++ Lib/asyncio/sslproto.py | 21 +-------------------- Lib/test/test_asyncio/test_sslproto.py | 13 +++++++------ 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 82e9bba5e59858..d9cfdff02c9ae1 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -234,7 +234,7 @@ def _data_received(self, data): if isinstance(self._protocol, protocols.BufferedProtocol): try: - sslproto._feed_data_to_bufferred_proto(self._protocol, data) + protocols._feed_data_to_bufferred_proto(self._protocol, data) except Exception as exc: self._fatal_error(exc, 'Fatal error: protocol.buffer_updated() ' diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py index b8d2e6be552e1e..4d47da387caa3f 100644 --- a/Lib/asyncio/protocols.py +++ b/Lib/asyncio/protocols.py @@ -189,3 +189,22 @@ def pipe_connection_lost(self, fd, exc): def process_exited(self): """Called when subprocess has exited.""" + + +def _feed_data_to_bufferred_proto(proto, data): + data_len = len(data) + while data_len: + buf = proto.get_buffer(data_len) + buf_len = len(buf) + if not buf_len: + raise RuntimeError('get_buffer() returned an empty buffer') + + if buf_len >= data_len: + buf[:data_len] = data + proto.buffer_updated(data_len) + return + else: + buf[:buf_len] = data[:buf_len] + proto.buffer_updated(buf_len) + data = data[buf_len:] + data_len = len(data) diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py index fac2ae74e808b8..5578c6f81834ae 100644 --- a/Lib/asyncio/sslproto.py +++ b/Lib/asyncio/sslproto.py @@ -535,7 +535,7 @@ def data_received(self, data): if chunk: try: if self._app_protocol_is_buffer: - _feed_data_to_bufferred_proto( + protocols._feed_data_to_bufferred_proto( self._app_protocol, chunk) else: self._app_protocol.data_received(chunk) @@ -721,22 +721,3 @@ def _abort(self): self._transport.abort() finally: self._finalize() - - -def _feed_data_to_bufferred_proto(proto, data): - data_len = len(data) - while data_len: - buf = proto.get_buffer(data_len) - buf_len = len(buf) - if not buf_len: - raise RuntimeError('get_buffer() returned an empty buffer') - - if buf_len >= data_len: - buf[:data_len] = data - proto.buffer_updated(data_len) - return - else: - buf[:buf_len] = data[:buf_len] - proto.buffer_updated(buf_len) - data = data[buf_len:] - data_len = len(data) diff --git a/Lib/test/test_asyncio/test_sslproto.py b/Lib/test/test_asyncio/test_sslproto.py index 78ab1eb8223148..4ace48f9988894 100644 --- a/Lib/test/test_asyncio/test_sslproto.py +++ b/Lib/test/test_asyncio/test_sslproto.py @@ -11,6 +11,7 @@ import asyncio from asyncio import log +from asyncio import protocols from asyncio import sslproto from asyncio import tasks from test.test_asyncio import utils as test_utils @@ -189,28 +190,28 @@ def buffer_updated(self, nsize): for usemv in [False, True]: proto = Proto(1, usemv) - sslproto._feed_data_to_bufferred_proto(proto, b'12345') + protocols._feed_data_to_bufferred_proto(proto, b'12345') self.assertEqual(proto.data, b'12345') proto = Proto(2, usemv) - sslproto._feed_data_to_bufferred_proto(proto, b'12345') + protocols._feed_data_to_bufferred_proto(proto, b'12345') self.assertEqual(proto.data, b'12345') proto = Proto(2, usemv) - sslproto._feed_data_to_bufferred_proto(proto, b'1234') + protocols._feed_data_to_bufferred_proto(proto, b'1234') self.assertEqual(proto.data, b'1234') proto = Proto(4, usemv) - sslproto._feed_data_to_bufferred_proto(proto, b'1234') + protocols._feed_data_to_bufferred_proto(proto, b'1234') self.assertEqual(proto.data, b'1234') proto = Proto(100, usemv) - sslproto._feed_data_to_bufferred_proto(proto, b'12345') + protocols._feed_data_to_bufferred_proto(proto, b'12345') self.assertEqual(proto.data, b'12345') proto = Proto(0, usemv) with self.assertRaisesRegex(RuntimeError, 'empty buffer'): - sslproto._feed_data_to_bufferred_proto(proto, b'12345') + protocols._feed_data_to_bufferred_proto(proto, b'12345') def test_start_tls_client_reg_proto_1(self): HELLO_MSG = b'1' * self.PAYLOAD_SIZE