diff --git a/AUTHORS b/AUTHORS index c16bdd4f..c516f091 100644 --- a/AUTHORS +++ b/AUTHORS @@ -16,6 +16,7 @@ Guido van Rossum : creator of the asyncio project and autho Gustavo Carneiro Jeff Quast Jonathan Slenders +Mark Korenberg Nikolay Kim Richard Oudkerk Saúl Ibarra Corretgé diff --git a/asyncio/compat.py b/asyncio/compat.py index 4790bb4a..7c145bcc 100644 --- a/asyncio/compat.py +++ b/asyncio/compat.py @@ -1,5 +1,7 @@ """Compatibility helpers for the different Python versions.""" +import os +import socket import sys PY34 = sys.version_info >= (3, 4) @@ -16,3 +18,20 @@ def flatten_list_bytes(list_of_data): bytes(data) if isinstance(data, memoryview) else data for data in list_of_data) return b''.join(list_of_data) + + +try: + SC_IOV_MAX = os.sysconf(os.sysconf_names['SC_IOV_MAX']) +except (KeyError, AttributeError): + # Windows-version have no os.sysconf() at all. + # It is not defined how IOV-related syscalls are limited + # when SC_IOV_MAX is missing. + # MacOS X, FreeBSD and Linux typically have value of 1024 + SC_IOV_MAX = 16 + +assert SC_IOV_MAX >= 16 + + +# Python for Windows still have no socket.sendmsg() +# http://bugs.python.org/issue27149 +HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py index ed2b4d75..e42fbcfc 100644 --- a/asyncio/selector_events.py +++ b/asyncio/selector_events.py @@ -9,6 +9,7 @@ import collections import errno import functools +import itertools import socket import warnings try: @@ -502,7 +503,8 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. + # Constructs initial value for self._buffer. + _buffer_factory = collections.deque # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an @@ -524,6 +526,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self._protocol_connected = True self._server = server self._buffer = self._buffer_factory() + self._buffer_size = 0 # Cached count of bytes in buffer self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if self._server is not None: @@ -569,6 +572,7 @@ def close(self): self._closing = True self._loop.remove_reader(self._sock_fd) if not self._buffer: + assert self._buffer_size == 0 self._conn_lost += 1 self._loop.remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) @@ -600,7 +604,9 @@ def _force_close(self, exc): if self._conn_lost: return if self._buffer: + assert self._buffer_size > 0 self._buffer.clear() + self._buffer_size = 0 self._loop.remove_writer(self._sock_fd) if not self._closing: self._closing = True @@ -623,7 +629,7 @@ def _call_connection_lost(self, exc): self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return self._buffer_size class _SelectorSocketTransport(_SelectorTransport): @@ -703,6 +709,7 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 # Optimization: try to send now. try: n = self._sock.send(data) @@ -712,45 +719,106 @@ def write(self, data): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] - if not data: + if n == len(data): return + if n > 0: + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + # Not all was written; register write handler. self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def _write_ready(self): - assert self._buffer, 'Data should not be empty' + assert self._buffer and self._buffer_size > 0, 'Data must not be empty' if self._conn_lost: return try: - n = self._sock.send(self._buffer) + if compat.HAS_SENDMSG: + # Note, this is not length in bytes, it is count of chunks in + # buffer + if len(self._buffer) > compat.SC_IOV_MAX: + # In order to minimize syscalls used for IO, we should call + # sendmsg() again and again until self._buffer is sent, + # or sendmsg() say that only part of requested data is sent. + # This will make code more complex. Instead, we assume that + # unsent part of self._buffer will be sent on next event loop + # iteration, even if one can be sent right now. + # According to benchmarking, this minor misoptimization does + # not hurt much. + buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + else: + buffers = self._buffer + n = self._sock.sendmsg(buffers) + else: + if len(self._buffer) > 1: + # 1. Flattening buffers here in order not to copy memory + # everytime chunk is added. + # 2. Also it is possible to remove flattening, and send just + # first chunk, but this will be probably slower. + data = compat.flatten_list_bytes(self._buffer) + # self._buffer_size is not changed in that operation + self._buffer.clear() + self._buffer.append(data) + n = self._sock.send(self._buffer[0]) except (BlockingIOError, InterruptedError): - pass + return except Exception as exc: self._loop.remove_writer(self._sock_fd) self._buffer.clear() + self._buffer_size = 0 self._fatal_error(exc, 'Fatal write error on socket transport') - else: - if n: - del self._buffer[:n] - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer: - self._loop.remove_writer(self._sock_fd) - if self._closing: - self._call_connection_lost(None) - elif self._eof: - self._sock.shutdown(socket.SHUT_WR) + return + + assert 0 <= n <= self._buffer_size + + while n > 0: + chunk = self._buffer.popleft() + chunk_len = len(chunk) + self._buffer_size -= chunk_len + n -= chunk_len + + if n < 0: + # only part of chunk was written, so push unread part of it + # back to _buffer. + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(chunk, memoryview): + chunk = memoryview(chunk) + chunk = chunk[n:] + self._buffer.appendleft(chunk) + self._buffer_size += len(chunk) + + self._maybe_resume_protocol() # May append to buffer. + + if self._buffer: + assert self._buffer_size > 0 + return + + assert self._buffer_size == 0 + + self._loop.remove_writer(self._sock_fd) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) def write_eof(self): if self._eof: return self._eof = True if not self._buffer: + assert self._buffer_size == 0 self._sock.shutdown(socket.SHUT_WR) def can_write_eof(self): @@ -759,6 +827,8 @@ def can_write_eof(self): class _SelectorSslTransport(_SelectorTransport): + # Unfortunatelly, SSLSocket does not provide neither sendmsg() nor writev() + # TODO: benchmark merging buffers in SSL socket as opposed to in Python _buffer_factory = bytearray def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, @@ -904,6 +974,7 @@ def _read_ready(self): self._write_ready() if self._buffer: + assert self._buffer_size > 0 self._loop.add_writer(self._sock_fd, self._write_ready) try: @@ -941,6 +1012,7 @@ def _write_ready(self): self._loop.add_reader(self._sock_fd, self._read_ready) if self._buffer: + assert self._buffer_size > 0 try: n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError): @@ -952,15 +1024,20 @@ def _write_ready(self): except Exception as exc: self._loop.remove_writer(self._sock_fd) self._buffer.clear() + self._buffer_size = 0 self._fatal_error(exc, 'Fatal write error on SSL transport') return + assert 0 <= n <= self._buffer_size + if n: del self._buffer[:n] + self._buffer_size -= n self._maybe_resume_protocol() # May append to buffer. if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) @@ -979,10 +1056,12 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def can_write_eof(self): @@ -991,8 +1070,6 @@ def can_write_eof(self): class _SelectorDatagramTransport(_SelectorTransport): - _buffer_factory = collections.deque - def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) @@ -1006,9 +1083,6 @@ def __init__(self, loop, sock, protocol, address=None, self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None) - def get_write_buffer_size(self): - return sum(len(data) for data, _ in self._buffer) - def _read_ready(self): if self._conn_lost: return @@ -1041,6 +1115,7 @@ def sendto(self, data, addr=None): return if not self._buffer: + assert self._buffer_size == 0 # Attempt to send it right away first. try: if self._address: @@ -1058,13 +1133,18 @@ def sendto(self, data, addr=None): 'Fatal write error on datagram transport') return - # Ensure that what we buffer is immutable. - self._buffer.append((bytes(data), addr)) + self._buffer.append((data, addr)) + self._buffer_size += len(data) self._maybe_pause_protocol() def _sendto_ready(self): + # Python 3.5 does not exports sendmmsg() syscall + # http://bugs.python.org/issue27022 + # So we use own loop instead of in-kernel one. while self._buffer: + assert self._buffer_size > 0 data, addr = self._buffer.popleft() + self._buffer_size -= len(data) try: if self._address: self._sock.send(data) @@ -1072,6 +1152,7 @@ def _sendto_ready(self): self._sock.sendto(data, addr) except (BlockingIOError, InterruptedError): self._buffer.appendleft((data, addr)) # Try again later. + self._buffer_size += len(data) break except OSError as exc: self._protocol.error_received(exc) @@ -1083,6 +1164,7 @@ def _sendto_ready(self): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) diff --git a/asyncio/transports.py b/asyncio/transports.py index 9a6d9197..081bc202 100644 --- a/asyncio/transports.py +++ b/asyncio/transports.py @@ -87,7 +87,19 @@ def write(self, data): This does not block; it buffers the data and arranges for it to be sent out asynchronously. + + If passed argument is bytearray, it is not allowed to change the size + of that object while write is in progress. Violation of that lead + to undefined behaviour. """ + + # In order to lock size of bytearray, we may wrap bytearray to + # memoryview, but this affects performance. Changing size of passed + # buffer after calling write() is very rare, obscure and suspicious + # logic, and even wrapping to memoryview will not help, since exception + # in user code will be raised when user changes size of passed + # bytearray. + raise NotImplementedError def writelines(self, list_of_data): @@ -140,7 +152,7 @@ class Transport(ReadTransport, WriteTransport): connection_made() method, passing it the transport. The implementation here raises NotImplemented for every method - except writelines(), which calls write() in a loop. + except writelines(). """ diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index d712749e..ec637b7b 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -1,6 +1,8 @@ """Selector event loop for Unix with signal handling.""" +import collections import errno +import itertools import os import signal import socket @@ -430,7 +432,8 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): "pipes, sockets and character devices") _set_nonblocking(self._fileno) self._protocol = protocol - self._buffer = [] + self._buffer = collections.deque() + self._buffer_size = 0 # Cached count of bytes in buffer self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. @@ -475,21 +478,21 @@ def __repr__(self): return '<%s>' % ' '.join(info) def get_write_buffer_size(self): - return sum(len(data) for data in self._buffer) + return self._buffer_size def _read_ready(self): # Pipe was closed by peer. if self._loop.get_debug(): logger.info("%r was closed by peer", self) if self._buffer: + assert self._buffer_size > 0 self._close(BrokenPipeError()) else: self._close() def write(self, data): assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) - if isinstance(data, bytearray): - data = memoryview(data) + if not data: return @@ -501,6 +504,7 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 # Attempt to send it right away first. try: n = os.write(self._fileno, data) @@ -513,39 +517,86 @@ def write(self, data): if n == len(data): return elif n > 0: + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(data, memoryview): + data = memoryview(data) data = data[n:] self._loop.add_writer(self._fileno, self._write_ready) self._buffer.append(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def _write_ready(self): - data = b''.join(self._buffer) - assert data, 'Data should not be empty' + # writing empty chunks to buffer is disallowed earlier + assert self._buffer and self._buffer_size > 0, 'Data must not be empty' - self._buffer.clear() try: - n = os.write(self._fileno, data) + # Note, this is not length in bytes, it is count of chunks in + # buffer + if len(self._buffer) > compat.SC_IOV_MAX: + # In order to minimize syscalls used for IO, we should call + # writev() again and again until self._buffer is sent, + # or writev() say that only part of requested data is sent. + # This will make code more complex. Instead, we assume that + # unsent part of self._buffer will be sent on next event loop + # iteration, even if one can be sent right now. + # According to benchmarking, this minor misoptimization does + # not hurt much. + buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + + # Unfortunatelly, Python 3.5 have the BUG - writev() does not + # accept generators. So, we wrap it in a list. + # http://bugs.python.org/issue27020 + buffers = list(buffers) + else: + buffers = self._buffer + n = os.writev(self._fileno, buffers) except (BlockingIOError, InterruptedError): - self._buffer.append(data) + return except Exception as exc: self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it # because _buffer is empty. self._loop.remove_writer(self._fileno) self._fatal_error(exc, 'Fatal write error on pipe transport') - else: - if n == len(data): - self._loop.remove_writer(self._fileno) - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer and self._closing: - self._loop.remove_reader(self._fileno) - self._call_connection_lost(None) - return - elif n > 0: - data = data[n:] + return + + assert 0 <= n <= self._buffer_size - self._buffer.append(data) # Try again later. + while n > 0: + chunk = self._buffer.popleft() + chunk_length = len(chunk) + self._buffer_size -= chunk_length + n -= chunk_length + + if n < 0: + # only part of chunk was written, so push unread part of it + # back to _buffer. + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(chunk, memoryview): + chunk = memoryview(chunk) + chunk = chunk[n:] + self._buffer.appendleft(chunk) + self._buffer_size += len(chunk) + + self._maybe_resume_protocol() # May append to buffer. + + if self._buffer: + assert self._buffer_size > 0 + return + + assert self._buffer_size == 0 + + self._loop.remove_writer(self._fileno) + + if self._closing: + self._loop.remove_reader(self._fileno) + self._call_connection_lost(None) def can_write_eof(self): return True @@ -556,6 +607,7 @@ def write_eof(self): assert self._pipe self._closing = True if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, None) @@ -596,8 +648,10 @@ def _fatal_error(self, exc, message='Fatal error on pipe transport'): def _close(self, exc=None): self._closing = True if self._buffer: + assert self._buffer_size > 0 self._loop.remove_writer(self._fileno) self._buffer.clear() + self._buffer_size = 0 self._loop.remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) diff --git a/examples/bench_streams.py b/examples/bench_streams.py new file mode 100755 index 00000000..89ac3b11 --- /dev/null +++ b/examples/bench_streams.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3.5 + +import asyncio +import os +import time + + +async def read_stream_until_end(stream, bufsize): + while True: + d = await stream.read(bufsize) + if not d: + break + + +async def write_to_stream(stream, bufsize, count, drain_after_each_buf): + # unfortunatelly, sometimes writes zeroes to disk are optimized by FS + buf = b'x' * bufsize + for i in range(count): + stream.write(buf) + if drain_after_each_buf: + await stream.drain() + await stream.drain() + stream.close() + + +def bench(fun): + # todo: @wraps(fun) + async def newfun(bufsize, count, drain_after_each_buf): + a = time.monotonic() + await fun(bufsize, count, drain_after_each_buf) + b = time.monotonic() + bufsize /= 1024 * 1024 + print('{}: {} buffers by {:.2f} MB {}: {:.2f} seconds ({:.2f} MB/sec)'.format( + fun.__name__, + count, + bufsize, + '(with intermediate drains)' if drain_after_each_buf else '', + b - a, + bufsize * count / (b - a), + )) + return newfun + + +async def connect_write_pipe(fd): + loop = asyncio.get_event_loop() + transport, protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, open(fd, 'wb')) + stream_writer = asyncio.StreamWriter(transport, protocol, None, loop) + return stream_writer, transport + + +async def connect_read_pipe(fd): + loop = asyncio.get_event_loop() + stream_reader = asyncio.StreamReader() + transport, protocol = await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(stream_reader), open(fd, 'rb')) + return stream_reader, transport + +@bench +async def bench_pipes(bufsize, count, drain_after_each_buf): + (r, w) = os.pipe() + (stream_reader, rtransport) = await connect_read_pipe(r) + (stream_writer, wtransport) = await connect_write_pipe(w) + await asyncio.gather( + write_to_stream(stream_writer, bufsize, count, drain_after_each_buf), + # by default, pipes can not read more than 65536 bytes. + # unless fcntl(pipe_fd, F_SETPIPE_SZ, ...) is called + read_stream_until_end(stream_reader, 65536), + ) + wtransport.close() + rtransport.close() + +@bench +async def bench_sockets(bufsize, count, drain_after_each_buf): + handler = lambda rs, ws: write_to_stream(ws, bufsize, count, drain_after_each_buf) + server = await asyncio.start_server(handler, '127.0.0.1', 0) + + addr = server.sockets[0].getsockname() + (rs, ws) = await asyncio.open_connection(*addr) + server.close() + await server.wait_closed() + + # now writing part is running "in background". TODO: wait it completion somehow + await read_stream_until_end(rs, 10 * 1024 * 1024) + ws.close() + + +async def amain(): + N = 5 + for drain in (False, True): + await bench_pipes(10 * 1024 * 1024, N, drain) + await bench_pipes(10 * 1024, N * 1024, drain) + + await bench_sockets(10 * 1024 * 1024, N, drain) + await bench_sockets(10 * 1024, N * 1024, drain) + + +def main(): + loop = asyncio.get_event_loop() + loop.run_until_complete(amain()) + + +if __name__ == '__main__': + main() diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index ff71c218..fc7dbecf 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -21,6 +21,7 @@ MOCK_ANY = mock.ANY +HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') class TestBaseSelectorEventLoop(BaseSelectorEventLoop): @@ -39,6 +40,21 @@ def list_to_buffer(l=()): return bytearray().join(l) +def dgram_prefill_buffer(stream, data, addr): + stream._buffer.append((data, addr)) + stream._buffer_size += len(data) + + +def ssl_prefill_buffer(stream, data): + stream._buffer.extend(data) + stream._buffer_size += len(data) + + +def sock_prefill_buffer(stream, data): + stream._buffer.append(data) + stream._buffer_size += len(data) + + def close_transport(transport): # Don't call transport.close() because the event loop and the selector # are mocked @@ -730,7 +746,7 @@ def test_close(self): def test_close_write_buffer(self): tr = self.create_transport() - tr._buffer.extend(b'data') + sock_prefill_buffer(tr, b'data') tr.close() self.assertFalse(self.loop.readers) @@ -739,13 +755,13 @@ def test_close_write_buffer(self): def test_force_close(self): tr = self.create_transport() - tr._buffer.extend(b'1') + sock_prefill_buffer(tr, b'1') self.loop.add_reader(7, mock.sentinel) self.loop.add_writer(7, mock.sentinel) tr._force_close(None) self.assertTrue(tr.is_closing()) - self.assertEqual(tr._buffer, list_to_buffer()) + self.assertSequenceEqual([], tr._buffer) self.assertFalse(self.loop.readers) self.assertFalse(self.loop.writers) @@ -925,18 +941,17 @@ def test_write_memoryview(self): def test_write_no_data(self): transport = self.socket_transport() - transport._buffer.extend(b'data') + sock_prefill_buffer(transport, b'data') transport.write(b'') self.assertFalse(self.sock.send.called) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_buffer(self): transport = self.socket_transport() - transport._buffer.extend(b'data1') + sock_prefill_buffer(transport, b'data1') transport.write(b'data2') self.assertFalse(self.sock.send.called) - self.assertEqual(list_to_buffer([b'data1', b'data2']), - transport._buffer) + self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) def test_write_partial(self): data = b'data' @@ -946,7 +961,7 @@ def test_write_partial(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_partial_bytearray(self): data = bytearray(b'data') @@ -956,7 +971,7 @@ def test_write_partial_bytearray(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. def test_write_partial_memoryview(self): @@ -967,7 +982,7 @@ def test_write_partial_memoryview(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_partial_none(self): data = b'data' @@ -978,7 +993,7 @@ def test_write_partial_none(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_tryagain(self): self.sock.send.side_effect = BlockingIOError @@ -988,7 +1003,7 @@ def test_write_tryagain(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) @mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): @@ -1026,25 +1041,33 @@ def test_write_closing(self): def test_write_ready(self): data = b'data' - self.sock.send.return_value = len(data) + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = len(data) transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(sendfun.called) self.assertFalse(self.loop.writers) def test_write_ready_closing(self): data = b'data' - self.sock.send.return_value = len(data) + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = len(data) transport = self.socket_transport() transport._closing = True - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(sendfun.called) self.assertFalse(self.loop.writers) self.sock.close.assert_called_with() self.protocol.connection_lost.assert_called_with(None) @@ -1056,43 +1079,63 @@ def test_write_ready_no_data(self): def test_write_ready_partial(self): data = b'data' - self.sock.send.return_value = 2 + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = 2 transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_ready_partial_none(self): data = b'data' - self.sock.send.return_value = 0 + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = 0 transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_ready_tryagain(self): - self.sock.send.side_effect = BlockingIOError + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.side_effect = BlockingIOError transport = self.socket_transport() - transport._buffer = list_to_buffer([b'data1', b'data2']) + sock_prefill_buffer(transport, b'data1') + sock_prefill_buffer(transport, b'data2') self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) + if HAS_SENDMSG: + self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) + else: + self.assertSequenceEqual([b'data1data2'], transport._buffer) def test_write_ready_exception(self): - err = self.sock.send.side_effect = OSError() + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + err = sendfun.side_effect = OSError() transport = self.socket_transport() transport._fatal_error = mock.Mock() - transport._buffer.extend(b'data') + sock_prefill_buffer(transport, b'data') transport._write_ready() transport._fatal_error.assert_called_with( err, @@ -1109,15 +1152,23 @@ def test_write_eof(self): def test_write_eof_buffer(self): tr = self.socket_transport() - self.sock.send.side_effect = BlockingIOError + if HAS_SENDMSG: + sendfun1 = self.sock.send + sendfun2 = self.sock.sendmsg + else: + sendfun1 = self.sock.send + sendfun2 = self.sock.send + sendfun1.side_effect = BlockingIOError + tr.write(b'data') tr.write_eof() - self.assertEqual(tr._buffer, list_to_buffer([b'data'])) + self.assertSequenceEqual([b'data'], tr._buffer) self.assertTrue(tr._eof) self.assertFalse(self.sock.shutdown.called) - self.sock.send.side_effect = lambda _: 4 + + sendfun2.side_effect = lambda _: 4 tr._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(sendfun2.called) self.sock.shutdown.assert_called_with(socket.SHUT_WR) tr.close() @@ -1246,7 +1297,7 @@ def test_write_memoryview(self): def test_write_no_data(self): transport = self._make_one() - transport._buffer.extend(b'data') + ssl_prefill_buffer(transport, b'data') transport.write(b'') self.assertEqual(list_to_buffer([b'data']), transport._buffer) @@ -1286,7 +1337,7 @@ def test_read_ready_write_wants_read(self): transport = self._make_one() transport._write_wants_read = True transport._write_ready = mock.Mock() - transport._buffer.extend(b'data') + ssl_prefill_buffer(transport, b'data') transport._read_ready() self.assertFalse(transport._write_wants_read) @@ -1350,7 +1401,7 @@ def test_read_ready_recv_exc(self): def test_write_ready_send(self): self.sslsock.send.return_value = 4 transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport._write_ready() self.assertEqual(list_to_buffer(), transport._buffer) self.assertTrue(self.sslsock.send.called) @@ -1358,7 +1409,8 @@ def test_write_ready_send(self): def test_write_ready_send_none(self): self.sslsock.send.return_value = 0 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) @@ -1366,7 +1418,8 @@ def test_write_ready_send_none(self): def test_write_ready_send_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertEqual(list_to_buffer([b'ta1data2']), transport._buffer) @@ -1374,7 +1427,8 @@ def test_write_ready_send_partial(self): def test_write_ready_send_closing_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertFalse(self.sslsock.close.called) @@ -1382,7 +1436,7 @@ def test_write_ready_send_closing_partial(self): def test_write_ready_send_closing(self): self.sslsock.send.return_value = 4 transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport.close() transport._write_ready() self.protocol.connection_lost.assert_called_with(None) @@ -1391,14 +1445,13 @@ def test_write_ready_send_closing_empty_buffer(self): self.sslsock.send.return_value = 4 call_soon = self.loop.call_soon = mock.Mock() transport = self._make_one() - transport._buffer = list_to_buffer() transport.close() transport._write_ready() call_soon.assert_called_with(transport._call_connection_lost, None) def test_write_ready_send_retry(self): transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') self.sslsock.send.side_effect = ssl.SSLWantWriteError transport._write_ready() @@ -1410,7 +1463,7 @@ def test_write_ready_send_retry(self): def test_write_ready_send_read(self): transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') self.loop.remove_writer = mock.Mock() self.sslsock.send.side_effect = ssl.SSLWantReadError @@ -1423,7 +1476,7 @@ def test_write_ready_send_exc(self): err = self.sslsock.send.side_effect = OSError() transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport._fatal_error = mock.Mock() transport._write_ready() transport._fatal_error.assert_called_with( @@ -1571,45 +1624,45 @@ def test_sendto_memoryview(self): def test_sendto_no_data(self): transport = self.datagram_transport() - transport._buffer.append((b'data', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data', ('0.0.0.0', 12345)) transport.sendto(b'', ()) self.assertFalse(self.sock.sendto.called) - self.assertEqual( - [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + self.assertSequenceEqual( + [(b'data', ('0.0.0.0', 12345))], transport._buffer) def test_sendto_buffer(self): transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(b'data2', ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) + transport._buffer) def test_sendto_buffer_bytearray(self): data2 = bytearray(b'data2') transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) - self.assertIsInstance(transport._buffer[1][0], bytes) + transport._buffer) + self.assertIsInstance(transport._buffer[1][0], bytearray) def test_sendto_buffer_memoryview(self): data2 = memoryview(b'data2') transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) - self.assertIsInstance(transport._buffer[1][0], bytes) + transport._buffer) + self.assertIsInstance(transport._buffer[1][0], memoryview) def test_sendto_tryagain(self): data = b'data' @@ -1620,8 +1673,8 @@ def test_sendto_tryagain(self): transport.sendto(data, ('0.0.0.0', 12345)) self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( - [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + self.assertSequenceEqual( + [(b'data', ('0.0.0.0', 12345))], transport._buffer) @mock.patch('asyncio.selector_events.logger') def test_sendto_exception(self, m_log): @@ -1691,7 +1744,7 @@ def test_sendto_ready(self): self.sock.sendto.return_value = len(data) transport = self.datagram_transport() - transport._buffer.append((data, ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, data, ('0.0.0.0', 12345)) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.assertTrue(self.sock.sendto.called) @@ -1705,7 +1758,7 @@ def test_sendto_ready_closing(self): transport = self.datagram_transport() transport._closing = True - transport._buffer.append((data, ())) + dgram_prefill_buffer(transport, data, ()) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.sock.sendto.assert_called_with(data, ()) @@ -1724,21 +1777,22 @@ def test_sendto_ready_tryagain(self): self.sock.sendto.side_effect = BlockingIOError transport = self.datagram_transport() - transport._buffer.extend([(b'data1', ()), (b'data2', ())]) + dgram_prefill_buffer(transport, b'data1', ()) + dgram_prefill_buffer(transport, b'data2', ()) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ()), (b'data2', ())], - list(transport._buffer)) + transport._buffer) def test_sendto_ready_exception(self): err = self.sock.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() transport._fatal_error.assert_called_with( @@ -1750,7 +1804,7 @@ def test_sendto_ready_error_received(self): transport = self.datagram_transport() transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() self.assertFalse(transport._fatal_error.called) @@ -1760,7 +1814,7 @@ def test_sendto_ready_error_received_connection(self): transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() self.assertFalse(transport._fatal_error.called) diff --git a/tests/test_unix_events.py b/tests/test_unix_events.py index 22dc6880..0f161d70 100644 --- a/tests/test_unix_events.py +++ b/tests/test_unix_events.py @@ -1,6 +1,7 @@ """Tests for unix_events.py.""" import collections +import contextlib import errno import io import os @@ -35,6 +36,54 @@ def close_pipe_transport(transport): transport._pipe = None +@contextlib.contextmanager +def patched_os_writev(): + """Patch os.writev() in a specific way. + + The unittest.mock stores arguments by reference. So if argument is mutable + and changed after tested call, we can not check arguments values correctly. + + Example: + + buf = [1, 2, 3] + writev(buf) # mock remembers reference to buf, not contents. + buf.pop() + ... + writev.assert_called_with([1, 2, 3]) # will fail + + Note, that writev() was called with [1,2,3], but mock see (in assert) that + buf is [1, 2] + """ + + class Wrapper: + def __init__(self, wrapped): + self._wrapped = wrapped + + def __call__(self, fd, buffers): + return self._wrapped(fd, list(buffers)) + + def __getattr__(self, name): + if name.startswith('_'): + return object.__getattr__(self, name) + else: + return getattr(self._wrapped, name) + + def __setattr__(self, name, val): + if name.startswith('_'): + object.__setattr__(self, name, val) + else: + setattr(self._wrapped, name, val) + + m_writev = Wrapper(mock.Mock()) + with mock.patch('os.writev', m_writev): + yield m_writev + + +def pipe_prefill_buffer(stream, data): + stream._buffer.append(data) + stream._buffer_size += len(data) + + @unittest.skipUnless(signal, 'Signals are not supported') class SelectorEventLoopSignalTests(test_utils.TestCase): @@ -518,7 +567,7 @@ def test_write(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) @mock.patch('os.write') def test_write_no_data(self, m_write): @@ -526,7 +575,7 @@ def test_write_no_data(self, m_write): tr.write(b'') self.assertFalse(m_write.called) self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) @mock.patch('os.write') def test_write_partial(self, m_write): @@ -535,17 +584,17 @@ def test_write_partial(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'ta'], tr._buffer) + self.assertSequenceEqual([b'ta'], tr._buffer) @mock.patch('os.write') def test_write_buffer(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'previous'] + pipe_prefill_buffer(tr, b'previous') tr.write(b'data') self.assertFalse(m_write.called) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'previous', b'data'], tr._buffer) + self.assertSequenceEqual([b'previous', b'data'], tr._buffer) @mock.patch('os.write') def test_write_again(self, m_write): @@ -554,7 +603,7 @@ def test_write_again(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + self.assertSequenceEqual([b'data'], tr._buffer) @mock.patch('asyncio.unix_events.logger') @mock.patch('os.write') @@ -566,7 +615,7 @@ def test_write_err(self, m_write, m_log): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) tr._fatal_error.assert_called_with( err, 'Fatal write error on pipe transport') @@ -602,98 +651,105 @@ def test__read_ready(self): test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) - @mock.patch('os.write') - def test__write_ready(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 4 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_partial(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 3 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'a'], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_again(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.side_effect = BlockingIOError() - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_empty(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 0 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + def test__write_ready(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 4 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertSequenceEqual([], tr._buffer) + + def test__write_ready_partial(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 3 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'a'], tr._buffer) + + def test__write_ready_again(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.side_effect = BlockingIOError() + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'da', b'ta'], tr._buffer) + + def test__write_ready_empty(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 0 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'da', b'ta'], tr._buffer) @mock.patch('asyncio.log.logger.error') - @mock.patch('os.write') - def test__write_ready_err(self, m_write, m_logexc): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.side_effect = err = OSError() - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) - self.assertTrue(tr.is_closing()) - m_logexc.assert_called_with( - test_utils.MockPattern( - 'Fatal write error on pipe transport' - '\nprotocol:.*\ntransport:.*'), - exc_info=(OSError, MOCK_ANY, MOCK_ANY)) - self.assertEqual(1, tr._conn_lost) - test_utils.run_briefly(self.loop) - self.protocol.connection_lost.assert_called_with(err) - - @mock.patch('os.write') - def test__write_ready_closing(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._closing = True - tr._buffer = [b'da', b'ta'] - m_write.return_value = 4 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) - self.protocol.connection_lost.assert_called_with(None) - self.pipe.close.assert_called_with() + def test__write_ready_err(self, m_logexc): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.side_effect = err = OSError() + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertFalse(self.loop.readers) + self.assertSequenceEqual([], tr._buffer) + self.assertTrue(tr.is_closing()) + m_logexc.assert_called_with( + test_utils.MockPattern( + 'Fatal write error on pipe transport' + '\nprotocol:.*\ntransport:.*'), + exc_info=(OSError, MOCK_ANY, MOCK_ANY)) + self.assertEqual(1, tr._conn_lost) + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(err) + + def test__write_ready_closing(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + tr._closing = True + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 4 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertFalse(self.loop.readers) + self.assertSequenceEqual([], tr._buffer) + self.protocol.connection_lost.assert_called_with(None) + self.pipe.close.assert_called_with() @mock.patch('os.write') def test_abort(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) self.loop.add_reader(5, tr._read_ready) - tr._buffer = [b'da', b'ta'] + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') tr.abort() self.assertFalse(m_write.called) self.assertFalse(self.loop.readers) self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) self.assertTrue(tr.is_closing()) test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) @@ -750,7 +806,7 @@ def test_write_eof(self): def test_write_eof_pending(self): tr = self.write_pipe_transport() - tr._buffer = [b'data'] + pipe_prefill_buffer(tr, b'data') tr.write_eof() self.assertTrue(tr.is_closing()) self.assertFalse(self.protocol.connection_lost.called)