From f874fff34e466fbac8c244d0324795c20760f2c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D1=80=D0=B5=D0=BD=D0=B1=D0=B5=D1=80=D0=B3=20?= =?UTF-8?q?=D0=9C=D0=B0=D1=80=D0=BA=20=28imac=29?= Date: Sat, 14 May 2016 01:26:49 +0500 Subject: [PATCH 1/4] Add stream benchmarking tool --- examples/bench_streams.py | 102 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100755 examples/bench_streams.py diff --git a/examples/bench_streams.py b/examples/bench_streams.py new file mode 100755 index 00000000..89ac3b11 --- /dev/null +++ b/examples/bench_streams.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3.5 + +import asyncio +import os +import time + + +async def read_stream_until_end(stream, bufsize): + while True: + d = await stream.read(bufsize) + if not d: + break + + +async def write_to_stream(stream, bufsize, count, drain_after_each_buf): + # unfortunatelly, sometimes writes zeroes to disk are optimized by FS + buf = b'x' * bufsize + for i in range(count): + stream.write(buf) + if drain_after_each_buf: + await stream.drain() + await stream.drain() + stream.close() + + +def bench(fun): + # todo: @wraps(fun) + async def newfun(bufsize, count, drain_after_each_buf): + a = time.monotonic() + await fun(bufsize, count, drain_after_each_buf) + b = time.monotonic() + bufsize /= 1024 * 1024 + print('{}: {} buffers by {:.2f} MB {}: {:.2f} seconds ({:.2f} MB/sec)'.format( + fun.__name__, + count, + bufsize, + '(with intermediate drains)' if drain_after_each_buf else '', + b - a, + bufsize * count / (b - a), + )) + return newfun + + +async def connect_write_pipe(fd): + loop = asyncio.get_event_loop() + transport, protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, open(fd, 'wb')) + stream_writer = asyncio.StreamWriter(transport, protocol, None, loop) + return stream_writer, transport + + +async def connect_read_pipe(fd): + loop = asyncio.get_event_loop() + stream_reader = asyncio.StreamReader() + transport, protocol = await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(stream_reader), open(fd, 'rb')) + return stream_reader, transport + +@bench +async def bench_pipes(bufsize, count, drain_after_each_buf): + (r, w) = os.pipe() + (stream_reader, rtransport) = await connect_read_pipe(r) + (stream_writer, wtransport) = await connect_write_pipe(w) + await asyncio.gather( + write_to_stream(stream_writer, bufsize, count, drain_after_each_buf), + # by default, pipes can not read more than 65536 bytes. + # unless fcntl(pipe_fd, F_SETPIPE_SZ, ...) is called + read_stream_until_end(stream_reader, 65536), + ) + wtransport.close() + rtransport.close() + +@bench +async def bench_sockets(bufsize, count, drain_after_each_buf): + handler = lambda rs, ws: write_to_stream(ws, bufsize, count, drain_after_each_buf) + server = await asyncio.start_server(handler, '127.0.0.1', 0) + + addr = server.sockets[0].getsockname() + (rs, ws) = await asyncio.open_connection(*addr) + server.close() + await server.wait_closed() + + # now writing part is running "in background". TODO: wait it completion somehow + await read_stream_until_end(rs, 10 * 1024 * 1024) + ws.close() + + +async def amain(): + N = 5 + for drain in (False, True): + await bench_pipes(10 * 1024 * 1024, N, drain) + await bench_pipes(10 * 1024, N * 1024, drain) + + await bench_sockets(10 * 1024 * 1024, N, drain) + await bench_sockets(10 * 1024, N * 1024, drain) + + +def main(): + loop = asyncio.get_event_loop() + loop.run_until_complete(amain()) + + +if __name__ == '__main__': + main() From b11a68763131e55549ce68e9b7cfa474e5406d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D1=80=D0=B5=D0=BD=D0=B1=D0=B5=D1=80=D0=B3=20?= =?UTF-8?q?=D0=9C=D0=B0=D1=80=D0=BA=20=28imac=29?= Date: Mon, 2 May 2016 23:43:24 +0500 Subject: [PATCH 2/4] Fix #338 - performance issue on stream writes For pipes, use os.writev() instead of os.write(b.join()). For sockets, use sendmsg() instead of send(b''.join()). Also change plain list of buffers to deque() of buffers. This greatly improve performance on large buffers or on many stream.write() calls. --- AUTHORS | 1 + asyncio/compat.py | 13 ++ asyncio/selector_events.py | 120 ++++++++++++++---- asyncio/transports.py | 14 +- asyncio/unix_events.py | 92 +++++++++++--- tests/test_selector_events.py | 154 ++++++++++++---------- tests/test_unix_events.py | 232 +++++++++++++++++++++------------- 7 files changed, 424 insertions(+), 202 deletions(-) diff --git a/AUTHORS b/AUTHORS index c16bdd4f..c516f091 100644 --- a/AUTHORS +++ b/AUTHORS @@ -16,6 +16,7 @@ Guido van Rossum : creator of the asyncio project and autho Gustavo Carneiro Jeff Quast Jonathan Slenders +Mark Korenberg Nikolay Kim Richard Oudkerk Saúl Ibarra Corretgé diff --git a/asyncio/compat.py b/asyncio/compat.py index 4790bb4a..61709a7c 100644 --- a/asyncio/compat.py +++ b/asyncio/compat.py @@ -1,5 +1,6 @@ """Compatibility helpers for the different Python versions.""" +import os import sys PY34 = sys.version_info >= (3, 4) @@ -16,3 +17,15 @@ def flatten_list_bytes(list_of_data): bytes(data) if isinstance(data, memoryview) else data for data in list_of_data) return b''.join(list_of_data) + + +try: + SC_IOV_MAX = os.sysconf(os.sysconf_names['SC_IOV_MAX']) +except (KeyError, AttributeError): + # Windows-version have no os.sysconf() at all. + # It is not defined how IOV-related syscalls are limited + # when SC_IOV_MAX is missing. + # MacOS X, FreeBSD and Linux typically have value of 1024 + SC_IOV_MAX = 16 + +assert SC_IOV_MAX >= 16 diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py index ed2b4d75..f6eaa919 100644 --- a/asyncio/selector_events.py +++ b/asyncio/selector_events.py @@ -9,6 +9,7 @@ import collections import errno import functools +import itertools import socket import warnings try: @@ -502,7 +503,8 @@ class _SelectorTransport(transports._FlowControlMixin, max_size = 256 * 1024 # Buffer size passed to recv(). - _buffer_factory = bytearray # Constructs initial value for self._buffer. + # Constructs initial value for self._buffer. + _buffer_factory = collections.deque # Attribute used in the destructor: it must be set even if the constructor # is not called (see _SelectorSslTransport which may start by raising an @@ -524,6 +526,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self._protocol_connected = True self._server = server self._buffer = self._buffer_factory() + self._buffer_size = 0 # Cached count of bytes in buffer self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if self._server is not None: @@ -569,6 +572,7 @@ def close(self): self._closing = True self._loop.remove_reader(self._sock_fd) if not self._buffer: + assert self._buffer_size == 0 self._conn_lost += 1 self._loop.remove_writer(self._sock_fd) self._loop.call_soon(self._call_connection_lost, None) @@ -600,7 +604,9 @@ def _force_close(self, exc): if self._conn_lost: return if self._buffer: + assert self._buffer_size > 0 self._buffer.clear() + self._buffer_size = 0 self._loop.remove_writer(self._sock_fd) if not self._closing: self._closing = True @@ -623,7 +629,7 @@ def _call_connection_lost(self, exc): self._server = None def get_write_buffer_size(self): - return len(self._buffer) + return self._buffer_size class _SelectorSocketTransport(_SelectorTransport): @@ -703,6 +709,7 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 # Optimization: try to send now. try: n = self._sock.send(data) @@ -712,45 +719,94 @@ def write(self, data): self._fatal_error(exc, 'Fatal write error on socket transport') return else: - data = data[n:] - if not data: + if n == len(data): return + if n > 0: + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + # Not all was written; register write handler. self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. - self._buffer.extend(data) + self._buffer.append(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def _write_ready(self): - assert self._buffer, 'Data should not be empty' + assert self._buffer and self._buffer_size > 0, 'Data must not be empty' if self._conn_lost: return try: - n = self._sock.send(self._buffer) + # Note, this is not length in bytes, it is count of chunks in + # buffer + if len(self._buffer) > compat.SC_IOV_MAX: + # In order to minimize syscalls used for IO, we should call + # sendmsg() again and again until self._buffer is sent, + # or sendmsg() say that only part of requested data is sent. + # This will make code more complex. Instead, we assume that + # unsent part of self._buffer will be sent on next event loop + # iteration, even if one can be sent right now. + # According to benchmarking, this minor misoptimization does + # not hurt much. + buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + else: + buffers = self._buffer + n = self._sock.sendmsg(buffers) except (BlockingIOError, InterruptedError): - pass + return except Exception as exc: self._loop.remove_writer(self._sock_fd) self._buffer.clear() + self._buffer_size = 0 self._fatal_error(exc, 'Fatal write error on socket transport') - else: - if n: - del self._buffer[:n] - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer: - self._loop.remove_writer(self._sock_fd) - if self._closing: - self._call_connection_lost(None) - elif self._eof: - self._sock.shutdown(socket.SHUT_WR) + return + + assert 0 <= n <= self._buffer_size + + while n > 0: + chunk = self._buffer.popleft() + chunk_len = len(chunk) + self._buffer_size -= chunk_len + n -= chunk_len + + if n < 0: + # only part of chunk was written, so push unread part of it + # back to _buffer. + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(chunk, memoryview): + chunk = memoryview(chunk) + chunk = chunk[n:] + self._buffer.appendleft(chunk) + self._buffer_size += len(chunk) + + self._maybe_resume_protocol() # May append to buffer. + + if self._buffer: + assert self._buffer_size > 0 + return + + assert self._buffer_size == 0 + + self._loop.remove_writer(self._sock_fd) + if self._closing: + self._call_connection_lost(None) + elif self._eof: + self._sock.shutdown(socket.SHUT_WR) def write_eof(self): if self._eof: return self._eof = True if not self._buffer: + assert self._buffer_size == 0 self._sock.shutdown(socket.SHUT_WR) def can_write_eof(self): @@ -759,6 +815,8 @@ def can_write_eof(self): class _SelectorSslTransport(_SelectorTransport): + # Unfortunatelly, SSLSocket does not provide neither sendmsg() nor writev() + # TODO: benchmark merging buffers in SSL socket as opposed to in Python _buffer_factory = bytearray def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, @@ -904,6 +962,7 @@ def _read_ready(self): self._write_ready() if self._buffer: + assert self._buffer_size > 0 self._loop.add_writer(self._sock_fd, self._write_ready) try: @@ -941,6 +1000,7 @@ def _write_ready(self): self._loop.add_reader(self._sock_fd, self._read_ready) if self._buffer: + assert self._buffer_size > 0 try: n = self._sock.send(self._buffer) except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError): @@ -952,15 +1012,20 @@ def _write_ready(self): except Exception as exc: self._loop.remove_writer(self._sock_fd) self._buffer.clear() + self._buffer_size = 0 self._fatal_error(exc, 'Fatal write error on SSL transport') return + assert 0 <= n <= self._buffer_size + if n: del self._buffer[:n] + self._buffer_size -= n self._maybe_resume_protocol() # May append to buffer. if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) @@ -979,10 +1044,12 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer. self._buffer.extend(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def can_write_eof(self): @@ -991,8 +1058,6 @@ def can_write_eof(self): class _SelectorDatagramTransport(_SelectorTransport): - _buffer_factory = collections.deque - def __init__(self, loop, sock, protocol, address=None, waiter=None, extra=None): super().__init__(loop, sock, protocol, extra) @@ -1006,9 +1071,6 @@ def __init__(self, loop, sock, protocol, address=None, self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None) - def get_write_buffer_size(self): - return sum(len(data) for data, _ in self._buffer) - def _read_ready(self): if self._conn_lost: return @@ -1041,6 +1103,7 @@ def sendto(self, data, addr=None): return if not self._buffer: + assert self._buffer_size == 0 # Attempt to send it right away first. try: if self._address: @@ -1058,13 +1121,18 @@ def sendto(self, data, addr=None): 'Fatal write error on datagram transport') return - # Ensure that what we buffer is immutable. - self._buffer.append((bytes(data), addr)) + self._buffer.append((data, addr)) + self._buffer_size += len(data) self._maybe_pause_protocol() def _sendto_ready(self): + # Python 3.5 does not exports sendmmsg() syscall + # http://bugs.python.org/issue27022 + # So we use own loop instead of in-kernel one. while self._buffer: + assert self._buffer_size > 0 data, addr = self._buffer.popleft() + self._buffer_size -= len(data) try: if self._address: self._sock.send(data) @@ -1072,6 +1140,7 @@ def _sendto_ready(self): self._sock.sendto(data, addr) except (BlockingIOError, InterruptedError): self._buffer.appendleft((data, addr)) # Try again later. + self._buffer_size += len(data) break except OSError as exc: self._protocol.error_received(exc) @@ -1083,6 +1152,7 @@ def _sendto_ready(self): self._maybe_resume_protocol() # May append to buffer. if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_writer(self._sock_fd) if self._closing: self._call_connection_lost(None) diff --git a/asyncio/transports.py b/asyncio/transports.py index 9a6d9197..081bc202 100644 --- a/asyncio/transports.py +++ b/asyncio/transports.py @@ -87,7 +87,19 @@ def write(self, data): This does not block; it buffers the data and arranges for it to be sent out asynchronously. + + If passed argument is bytearray, it is not allowed to change the size + of that object while write is in progress. Violation of that lead + to undefined behaviour. """ + + # In order to lock size of bytearray, we may wrap bytearray to + # memoryview, but this affects performance. Changing size of passed + # buffer after calling write() is very rare, obscure and suspicious + # logic, and even wrapping to memoryview will not help, since exception + # in user code will be raised when user changes size of passed + # bytearray. + raise NotImplementedError def writelines(self, list_of_data): @@ -140,7 +152,7 @@ class Transport(ReadTransport, WriteTransport): connection_made() method, passing it the transport. The implementation here raises NotImplemented for every method - except writelines(), which calls write() in a loop. + except writelines(). """ diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index d712749e..7f9f4461 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -1,6 +1,8 @@ """Selector event loop for Unix with signal handling.""" +import collections import errno +import itertools import os import signal import socket @@ -430,7 +432,8 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): "pipes, sockets and character devices") _set_nonblocking(self._fileno) self._protocol = protocol - self._buffer = [] + self._buffer = collections.deque() + self._buffer_size = 0 # Cached count of bytes in buffer self._conn_lost = 0 self._closing = False # Set when close() or write_eof() called. @@ -475,21 +478,21 @@ def __repr__(self): return '<%s>' % ' '.join(info) def get_write_buffer_size(self): - return sum(len(data) for data in self._buffer) + return self._buffer_size def _read_ready(self): # Pipe was closed by peer. if self._loop.get_debug(): logger.info("%r was closed by peer", self) if self._buffer: + assert self._buffer_size > 0 self._close(BrokenPipeError()) else: self._close() def write(self, data): assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) - if isinstance(data, bytearray): - data = memoryview(data) + if not data: return @@ -501,6 +504,7 @@ def write(self, data): return if not self._buffer: + assert self._buffer_size == 0 # Attempt to send it right away first. try: n = os.write(self._fileno, data) @@ -513,39 +517,84 @@ def write(self, data): if n == len(data): return elif n > 0: + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(data, memoryview): + data = memoryview(data) data = data[n:] self._loop.add_writer(self._fileno, self._write_ready) self._buffer.append(data) + self._buffer_size += len(data) self._maybe_pause_protocol() def _write_ready(self): - data = b''.join(self._buffer) - assert data, 'Data should not be empty' + # writing empty chunks to buffer is disallowed earlier + assert self._buffer and self._buffer_size > 0, 'Data must not be empty' - self._buffer.clear() try: - n = os.write(self._fileno, data) + # Note, this is not length in bytes, it is count of chunks in + # buffer + if len(self._buffer) > compat.SC_IOV_MAX: + # In order to minimize syscalls used for IO, we should call + # writev() again and again until self._buffer is sent, + # or writev() say that only part of requested data is sent. + # This will make code more complex. Instead, we assume that + # unsent part of self._buffer will be sent on next event loop + # iteration, even if one can be sent right now. + # According to benchmarking, this minor misoptimization does + # not hurt much. + buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + + # Unfortunatelly, Python 3.5 have the BUG - writev() does not + # accept generators. So, we wrap it in a list. + # http://bugs.python.org/issue27020 + buffers = list(buffers) + else: + buffers = self._buffer + n = os.writev(self._fileno, buffers) except (BlockingIOError, InterruptedError): - self._buffer.append(data) + return except Exception as exc: self._conn_lost += 1 # Remove writer here, _fatal_error() doesn't it # because _buffer is empty. self._loop.remove_writer(self._fileno) self._fatal_error(exc, 'Fatal write error on pipe transport') - else: - if n == len(data): - self._loop.remove_writer(self._fileno) - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer and self._closing: - self._loop.remove_reader(self._fileno) - self._call_connection_lost(None) - return - elif n > 0: - data = data[n:] + return + + assert 0 <= n <= self._buffer_size + + while n > 0: + chunk = self._buffer.popleft() + chunk_length = len(chunk) + self._buffer_size -= chunk_length + n -= chunk_length + + if n < 0: + # only part of chunk was written, so push unread part of it + # back to _buffer. + # memoryview is required to eliminate memory copying while + # slicing. "unused" memory will be freed when this chunk is + # completely written + if not isinstance(chunk, memoryview): + chunk = memoryview(chunk) + chunk = chunk[n:] + self._buffer.appendleft(chunk) + self._buffer_size += len(chunk) + + if self._buffer: + assert self._buffer_size > 0 + return - self._buffer.append(data) # Try again later. + assert self._buffer_size == 0 + + self._loop.remove_writer(self._fileno) + self._maybe_resume_protocol() # May append to buffer. + if not self._buffer and self._closing: + self._loop.remove_reader(self._fileno) + self._call_connection_lost(None) def can_write_eof(self): return True @@ -556,6 +605,7 @@ def write_eof(self): assert self._pipe self._closing = True if not self._buffer: + assert self._buffer_size == 0 self._loop.remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, None) @@ -596,8 +646,10 @@ def _fatal_error(self, exc, message='Fatal error on pipe transport'): def _close(self, exc=None): self._closing = True if self._buffer: + assert self._buffer_size > 0 self._loop.remove_writer(self._fileno) self._buffer.clear() + self._buffer_size = 0 self._loop.remove_reader(self._fileno) self._loop.call_soon(self._call_connection_lost, exc) diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index ff71c218..c885d20b 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -39,6 +39,21 @@ def list_to_buffer(l=()): return bytearray().join(l) +def dgram_prefill_buffer(stream, data, addr): + stream._buffer.append((data, addr)) + stream._buffer_size += len(data) + + +def ssl_prefill_buffer(stream, data): + stream._buffer.extend(data) + stream._buffer_size += len(data) + + +def sock_prefill_buffer(stream, data): + stream._buffer.append(data) + stream._buffer_size += len(data) + + def close_transport(transport): # Don't call transport.close() because the event loop and the selector # are mocked @@ -730,7 +745,7 @@ def test_close(self): def test_close_write_buffer(self): tr = self.create_transport() - tr._buffer.extend(b'data') + sock_prefill_buffer(tr, b'data') tr.close() self.assertFalse(self.loop.readers) @@ -739,13 +754,13 @@ def test_close_write_buffer(self): def test_force_close(self): tr = self.create_transport() - tr._buffer.extend(b'1') + sock_prefill_buffer(tr, b'1') self.loop.add_reader(7, mock.sentinel) self.loop.add_writer(7, mock.sentinel) tr._force_close(None) self.assertTrue(tr.is_closing()) - self.assertEqual(tr._buffer, list_to_buffer()) + self.assertSequenceEqual([], tr._buffer) self.assertFalse(self.loop.readers) self.assertFalse(self.loop.writers) @@ -925,18 +940,17 @@ def test_write_memoryview(self): def test_write_no_data(self): transport = self.socket_transport() - transport._buffer.extend(b'data') + sock_prefill_buffer(transport, b'data') transport.write(b'') self.assertFalse(self.sock.send.called) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_buffer(self): transport = self.socket_transport() - transport._buffer.extend(b'data1') + sock_prefill_buffer(transport, b'data1') transport.write(b'data2') self.assertFalse(self.sock.send.called) - self.assertEqual(list_to_buffer([b'data1', b'data2']), - transport._buffer) + self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) def test_write_partial(self): data = b'data' @@ -946,7 +960,7 @@ def test_write_partial(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_partial_bytearray(self): data = bytearray(b'data') @@ -956,7 +970,7 @@ def test_write_partial_bytearray(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated. def test_write_partial_memoryview(self): @@ -967,7 +981,7 @@ def test_write_partial_memoryview(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_partial_none(self): data = b'data' @@ -978,7 +992,7 @@ def test_write_partial_none(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_tryagain(self): self.sock.send.side_effect = BlockingIOError @@ -988,7 +1002,7 @@ def test_write_tryagain(self): transport.write(data) self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) @mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): @@ -1026,25 +1040,25 @@ def test_write_closing(self): def test_write_ready(self): data = b'data' - self.sock.send.return_value = len(data) + self.sock.sendmsg.return_value = len(data) transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) def test_write_ready_closing(self): data = b'data' - self.sock.send.return_value = len(data) + self.sock.sendmsg.return_value = len(data) transport = self.socket_transport() transport._closing = True - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(self.sock.sendmsg.called) self.assertFalse(self.loop.writers) self.sock.close.assert_called_with() self.protocol.connection_lost.assert_called_with(None) @@ -1056,43 +1070,44 @@ def test_write_ready_no_data(self): def test_write_ready_partial(self): data = b'data' - self.sock.send.return_value = 2 + self.sock.sendmsg.return_value = 2 transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'ta']), transport._buffer) + self.assertSequenceEqual([b'ta'], transport._buffer) def test_write_ready_partial_none(self): data = b'data' - self.sock.send.return_value = 0 + self.sock.sendmsg.return_value = 0 transport = self.socket_transport() - transport._buffer.extend(data) + sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data']), transport._buffer) + self.assertSequenceEqual([b'data'], transport._buffer) def test_write_ready_tryagain(self): - self.sock.send.side_effect = BlockingIOError + self.sock.sendmsg.side_effect = BlockingIOError transport = self.socket_transport() - transport._buffer = list_to_buffer([b'data1', b'data2']) + sock_prefill_buffer(transport, b'data1') + sock_prefill_buffer(transport, b'data2') self.loop.add_writer(7, transport._write_ready) transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) + self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) def test_write_ready_exception(self): - err = self.sock.send.side_effect = OSError() + err = self.sock.sendmsg.side_effect = OSError() transport = self.socket_transport() transport._fatal_error = mock.Mock() - transport._buffer.extend(b'data') + sock_prefill_buffer(transport, b'data') transport._write_ready() transport._fatal_error.assert_called_with( err, @@ -1112,12 +1127,12 @@ def test_write_eof_buffer(self): self.sock.send.side_effect = BlockingIOError tr.write(b'data') tr.write_eof() - self.assertEqual(tr._buffer, list_to_buffer([b'data'])) + self.assertSequenceEqual([b'data'], tr._buffer) self.assertTrue(tr._eof) self.assertFalse(self.sock.shutdown.called) - self.sock.send.side_effect = lambda _: 4 + self.sock.sendmsg.side_effect = lambda _: 4 tr._write_ready() - self.assertTrue(self.sock.send.called) + self.assertTrue(self.sock.sendmsg.called) self.sock.shutdown.assert_called_with(socket.SHUT_WR) tr.close() @@ -1246,7 +1261,7 @@ def test_write_memoryview(self): def test_write_no_data(self): transport = self._make_one() - transport._buffer.extend(b'data') + ssl_prefill_buffer(transport, b'data') transport.write(b'') self.assertEqual(list_to_buffer([b'data']), transport._buffer) @@ -1286,7 +1301,7 @@ def test_read_ready_write_wants_read(self): transport = self._make_one() transport._write_wants_read = True transport._write_ready = mock.Mock() - transport._buffer.extend(b'data') + ssl_prefill_buffer(transport, b'data') transport._read_ready() self.assertFalse(transport._write_wants_read) @@ -1350,7 +1365,7 @@ def test_read_ready_recv_exc(self): def test_write_ready_send(self): self.sslsock.send.return_value = 4 transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport._write_ready() self.assertEqual(list_to_buffer(), transport._buffer) self.assertTrue(self.sslsock.send.called) @@ -1358,7 +1373,8 @@ def test_write_ready_send(self): def test_write_ready_send_none(self): self.sslsock.send.return_value = 0 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer) @@ -1366,7 +1382,8 @@ def test_write_ready_send_none(self): def test_write_ready_send_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertEqual(list_to_buffer([b'ta1data2']), transport._buffer) @@ -1374,7 +1391,8 @@ def test_write_ready_send_partial(self): def test_write_ready_send_closing_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one() - transport._buffer = list_to_buffer([b'data1', b'data2']) + ssl_prefill_buffer(transport, b'data1') + ssl_prefill_buffer(transport, b'data2') transport._write_ready() self.assertTrue(self.sslsock.send.called) self.assertFalse(self.sslsock.close.called) @@ -1382,7 +1400,7 @@ def test_write_ready_send_closing_partial(self): def test_write_ready_send_closing(self): self.sslsock.send.return_value = 4 transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport.close() transport._write_ready() self.protocol.connection_lost.assert_called_with(None) @@ -1391,14 +1409,13 @@ def test_write_ready_send_closing_empty_buffer(self): self.sslsock.send.return_value = 4 call_soon = self.loop.call_soon = mock.Mock() transport = self._make_one() - transport._buffer = list_to_buffer() transport.close() transport._write_ready() call_soon.assert_called_with(transport._call_connection_lost, None) def test_write_ready_send_retry(self): transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') self.sslsock.send.side_effect = ssl.SSLWantWriteError transport._write_ready() @@ -1410,7 +1427,7 @@ def test_write_ready_send_retry(self): def test_write_ready_send_read(self): transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') self.loop.remove_writer = mock.Mock() self.sslsock.send.side_effect = ssl.SSLWantReadError @@ -1423,7 +1440,7 @@ def test_write_ready_send_exc(self): err = self.sslsock.send.side_effect = OSError() transport = self._make_one() - transport._buffer = list_to_buffer([b'data']) + ssl_prefill_buffer(transport, b'data') transport._fatal_error = mock.Mock() transport._write_ready() transport._fatal_error.assert_called_with( @@ -1571,45 +1588,45 @@ def test_sendto_memoryview(self): def test_sendto_no_data(self): transport = self.datagram_transport() - transport._buffer.append((b'data', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data', ('0.0.0.0', 12345)) transport.sendto(b'', ()) self.assertFalse(self.sock.sendto.called) - self.assertEqual( - [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + self.assertSequenceEqual( + [(b'data', ('0.0.0.0', 12345))], transport._buffer) def test_sendto_buffer(self): transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(b'data2', ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) + transport._buffer) def test_sendto_buffer_bytearray(self): data2 = bytearray(b'data2') transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) - self.assertIsInstance(transport._buffer[1][0], bytes) + transport._buffer) + self.assertIsInstance(transport._buffer[1][0], bytearray) def test_sendto_buffer_memoryview(self): data2 = memoryview(b'data2') transport = self.datagram_transport() - transport._buffer.append((b'data1', ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, b'data1', ('0.0.0.0', 12345)) transport.sendto(data2, ('0.0.0.0', 12345)) self.assertFalse(self.sock.sendto.called) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ('0.0.0.0', 12345)), (b'data2', ('0.0.0.0', 12345))], - list(transport._buffer)) - self.assertIsInstance(transport._buffer[1][0], bytes) + transport._buffer) + self.assertIsInstance(transport._buffer[1][0], memoryview) def test_sendto_tryagain(self): data = b'data' @@ -1620,8 +1637,8 @@ def test_sendto_tryagain(self): transport.sendto(data, ('0.0.0.0', 12345)) self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( - [(b'data', ('0.0.0.0', 12345))], list(transport._buffer)) + self.assertSequenceEqual( + [(b'data', ('0.0.0.0', 12345))], transport._buffer) @mock.patch('asyncio.selector_events.logger') def test_sendto_exception(self, m_log): @@ -1691,7 +1708,7 @@ def test_sendto_ready(self): self.sock.sendto.return_value = len(data) transport = self.datagram_transport() - transport._buffer.append((data, ('0.0.0.0', 12345))) + dgram_prefill_buffer(transport, data, ('0.0.0.0', 12345)) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.assertTrue(self.sock.sendto.called) @@ -1705,7 +1722,7 @@ def test_sendto_ready_closing(self): transport = self.datagram_transport() transport._closing = True - transport._buffer.append((data, ())) + dgram_prefill_buffer(transport, data, ()) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.sock.sendto.assert_called_with(data, ()) @@ -1724,21 +1741,22 @@ def test_sendto_ready_tryagain(self): self.sock.sendto.side_effect = BlockingIOError transport = self.datagram_transport() - transport._buffer.extend([(b'data1', ()), (b'data2', ())]) + dgram_prefill_buffer(transport, b'data1', ()) + dgram_prefill_buffer(transport, b'data2', ()) self.loop.add_writer(7, transport._sendto_ready) transport._sendto_ready() self.loop.assert_writer(7, transport._sendto_ready) - self.assertEqual( + self.assertSequenceEqual( [(b'data1', ()), (b'data2', ())], - list(transport._buffer)) + transport._buffer) def test_sendto_ready_exception(self): err = self.sock.sendto.side_effect = RuntimeError() transport = self.datagram_transport() transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() transport._fatal_error.assert_called_with( @@ -1750,7 +1768,7 @@ def test_sendto_ready_error_received(self): transport = self.datagram_transport() transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() self.assertFalse(transport._fatal_error.called) @@ -1760,7 +1778,7 @@ def test_sendto_ready_error_received_connection(self): transport = self.datagram_transport(address=('0.0.0.0', 1)) transport._fatal_error = mock.Mock() - transport._buffer.append((b'data', ())) + dgram_prefill_buffer(transport, b'data', ()) transport._sendto_ready() self.assertFalse(transport._fatal_error.called) diff --git a/tests/test_unix_events.py b/tests/test_unix_events.py index 22dc6880..0f161d70 100644 --- a/tests/test_unix_events.py +++ b/tests/test_unix_events.py @@ -1,6 +1,7 @@ """Tests for unix_events.py.""" import collections +import contextlib import errno import io import os @@ -35,6 +36,54 @@ def close_pipe_transport(transport): transport._pipe = None +@contextlib.contextmanager +def patched_os_writev(): + """Patch os.writev() in a specific way. + + The unittest.mock stores arguments by reference. So if argument is mutable + and changed after tested call, we can not check arguments values correctly. + + Example: + + buf = [1, 2, 3] + writev(buf) # mock remembers reference to buf, not contents. + buf.pop() + ... + writev.assert_called_with([1, 2, 3]) # will fail + + Note, that writev() was called with [1,2,3], but mock see (in assert) that + buf is [1, 2] + """ + + class Wrapper: + def __init__(self, wrapped): + self._wrapped = wrapped + + def __call__(self, fd, buffers): + return self._wrapped(fd, list(buffers)) + + def __getattr__(self, name): + if name.startswith('_'): + return object.__getattr__(self, name) + else: + return getattr(self._wrapped, name) + + def __setattr__(self, name, val): + if name.startswith('_'): + object.__setattr__(self, name, val) + else: + setattr(self._wrapped, name, val) + + m_writev = Wrapper(mock.Mock()) + with mock.patch('os.writev', m_writev): + yield m_writev + + +def pipe_prefill_buffer(stream, data): + stream._buffer.append(data) + stream._buffer_size += len(data) + + @unittest.skipUnless(signal, 'Signals are not supported') class SelectorEventLoopSignalTests(test_utils.TestCase): @@ -518,7 +567,7 @@ def test_write(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) @mock.patch('os.write') def test_write_no_data(self, m_write): @@ -526,7 +575,7 @@ def test_write_no_data(self, m_write): tr.write(b'') self.assertFalse(m_write.called) self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) @mock.patch('os.write') def test_write_partial(self, m_write): @@ -535,17 +584,17 @@ def test_write_partial(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'ta'], tr._buffer) + self.assertSequenceEqual([b'ta'], tr._buffer) @mock.patch('os.write') def test_write_buffer(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'previous'] + pipe_prefill_buffer(tr, b'previous') tr.write(b'data') self.assertFalse(m_write.called) self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'previous', b'data'], tr._buffer) + self.assertSequenceEqual([b'previous', b'data'], tr._buffer) @mock.patch('os.write') def test_write_again(self, m_write): @@ -554,7 +603,7 @@ def test_write_again(self, m_write): tr.write(b'data') m_write.assert_called_with(5, b'data') self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + self.assertSequenceEqual([b'data'], tr._buffer) @mock.patch('asyncio.unix_events.logger') @mock.patch('os.write') @@ -566,7 +615,7 @@ def test_write_err(self, m_write, m_log): tr.write(b'data') m_write.assert_called_with(5, b'data') self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) tr._fatal_error.assert_called_with( err, 'Fatal write error on pipe transport') @@ -602,98 +651,105 @@ def test__read_ready(self): test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) - @mock.patch('os.write') - def test__write_ready(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 4 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_partial(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 3 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'a'], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_again(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.side_effect = BlockingIOError() - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) - - @mock.patch('os.write') - def test__write_ready_empty(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.return_value = 0 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.loop.assert_writer(5, tr._write_ready) - self.assertEqual([b'data'], tr._buffer) + def test__write_ready(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 4 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertSequenceEqual([], tr._buffer) + + def test__write_ready_partial(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 3 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'a'], tr._buffer) + + def test__write_ready_again(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.side_effect = BlockingIOError() + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'da', b'ta'], tr._buffer) + + def test__write_ready_empty(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 0 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.loop.assert_writer(5, tr._write_ready) + self.assertSequenceEqual([b'da', b'ta'], tr._buffer) @mock.patch('asyncio.log.logger.error') - @mock.patch('os.write') - def test__write_ready_err(self, m_write, m_logexc): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._buffer = [b'da', b'ta'] - m_write.side_effect = err = OSError() - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) - self.assertTrue(tr.is_closing()) - m_logexc.assert_called_with( - test_utils.MockPattern( - 'Fatal write error on pipe transport' - '\nprotocol:.*\ntransport:.*'), - exc_info=(OSError, MOCK_ANY, MOCK_ANY)) - self.assertEqual(1, tr._conn_lost) - test_utils.run_briefly(self.loop) - self.protocol.connection_lost.assert_called_with(err) - - @mock.patch('os.write') - def test__write_ready_closing(self, m_write): - tr = self.write_pipe_transport() - self.loop.add_writer(5, tr._write_ready) - tr._closing = True - tr._buffer = [b'da', b'ta'] - m_write.return_value = 4 - tr._write_ready() - m_write.assert_called_with(5, b'data') - self.assertFalse(self.loop.writers) - self.assertFalse(self.loop.readers) - self.assertEqual([], tr._buffer) - self.protocol.connection_lost.assert_called_with(None) - self.pipe.close.assert_called_with() + def test__write_ready_err(self, m_logexc): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.side_effect = err = OSError() + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertFalse(self.loop.readers) + self.assertSequenceEqual([], tr._buffer) + self.assertTrue(tr.is_closing()) + m_logexc.assert_called_with( + test_utils.MockPattern( + 'Fatal write error on pipe transport' + '\nprotocol:.*\ntransport:.*'), + exc_info=(OSError, MOCK_ANY, MOCK_ANY)) + self.assertEqual(1, tr._conn_lost) + test_utils.run_briefly(self.loop) + self.protocol.connection_lost.assert_called_with(err) + + def test__write_ready_closing(self): + with patched_os_writev() as m_writev: + tr = self.write_pipe_transport() + self.loop.add_writer(5, tr._write_ready) + tr._closing = True + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') + m_writev.return_value = 4 + tr._write_ready() + m_writev.assert_called_with(5, [b'da', b'ta']) + self.assertFalse(self.loop.writers) + self.assertFalse(self.loop.readers) + self.assertSequenceEqual([], tr._buffer) + self.protocol.connection_lost.assert_called_with(None) + self.pipe.close.assert_called_with() @mock.patch('os.write') def test_abort(self, m_write): tr = self.write_pipe_transport() self.loop.add_writer(5, tr._write_ready) self.loop.add_reader(5, tr._read_ready) - tr._buffer = [b'da', b'ta'] + pipe_prefill_buffer(tr, b'da') + pipe_prefill_buffer(tr, b'ta') tr.abort() self.assertFalse(m_write.called) self.assertFalse(self.loop.readers) self.assertFalse(self.loop.writers) - self.assertEqual([], tr._buffer) + self.assertSequenceEqual([], tr._buffer) self.assertTrue(tr.is_closing()) test_utils.run_briefly(self.loop) self.protocol.connection_lost.assert_called_with(None) @@ -750,7 +806,7 @@ def test_write_eof(self): def test_write_eof_pending(self): tr = self.write_pipe_transport() - tr._buffer = [b'data'] + pipe_prefill_buffer(tr, b'data') tr.write_eof() self.assertTrue(tr.is_closing()) self.assertFalse(self.protocol.connection_lost.called) From 906e7632017f83768881d24769aef1893866dcf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D1=80=D0=B5=D0=BD=D0=B1=D0=B5=D1=80=D0=B3=20?= =?UTF-8?q?=D0=9C=D0=B0=D1=80=D0=BA=20=28imac=29?= Date: Tue, 3 May 2016 03:16:14 +0500 Subject: [PATCH 3/4] _maybe_resume_protocol() for UNIX pipes called before checking that buffer is empty --- asyncio/unix_events.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py index 7f9f4461..ec637b7b 100644 --- a/asyncio/unix_events.py +++ b/asyncio/unix_events.py @@ -584,6 +584,8 @@ def _write_ready(self): self._buffer.appendleft(chunk) self._buffer_size += len(chunk) + self._maybe_resume_protocol() # May append to buffer. + if self._buffer: assert self._buffer_size > 0 return @@ -591,8 +593,8 @@ def _write_ready(self): assert self._buffer_size == 0 self._loop.remove_writer(self._fileno) - self._maybe_resume_protocol() # May append to buffer. - if not self._buffer and self._closing: + + if self._closing: self._loop.remove_reader(self._fileno) self._call_connection_lost(None) From 677769dadf00b0301228d01e7648c5564f1911bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D1=80=D0=B5=D0=BD=D0=B1=D0=B5=D1=80=D0=B3=20?= =?UTF-8?q?=D0=9C=D0=B0=D1=80=D0=BA=20=28imac=29?= Date: Sun, 29 May 2016 10:28:14 +0500 Subject: [PATCH 4/4] Workaround missing sendmsg() for Windows --- asyncio/compat.py | 6 ++++ asyncio/selector_events.py | 40 +++++++++++++++-------- tests/test_selector_events.py | 60 ++++++++++++++++++++++++++++------- 3 files changed, 80 insertions(+), 26 deletions(-) diff --git a/asyncio/compat.py b/asyncio/compat.py index 61709a7c..7c145bcc 100644 --- a/asyncio/compat.py +++ b/asyncio/compat.py @@ -1,6 +1,7 @@ """Compatibility helpers for the different Python versions.""" import os +import socket import sys PY34 = sys.version_info >= (3, 4) @@ -29,3 +30,8 @@ def flatten_list_bytes(list_of_data): SC_IOV_MAX = 16 assert SC_IOV_MAX >= 16 + + +# Python for Windows still have no socket.sendmsg() +# http://bugs.python.org/issue27149 +HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py index f6eaa919..e42fbcfc 100644 --- a/asyncio/selector_events.py +++ b/asyncio/selector_events.py @@ -743,21 +743,33 @@ def _write_ready(self): if self._conn_lost: return try: - # Note, this is not length in bytes, it is count of chunks in - # buffer - if len(self._buffer) > compat.SC_IOV_MAX: - # In order to minimize syscalls used for IO, we should call - # sendmsg() again and again until self._buffer is sent, - # or sendmsg() say that only part of requested data is sent. - # This will make code more complex. Instead, we assume that - # unsent part of self._buffer will be sent on next event loop - # iteration, even if one can be sent right now. - # According to benchmarking, this minor misoptimization does - # not hurt much. - buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + if compat.HAS_SENDMSG: + # Note, this is not length in bytes, it is count of chunks in + # buffer + if len(self._buffer) > compat.SC_IOV_MAX: + # In order to minimize syscalls used for IO, we should call + # sendmsg() again and again until self._buffer is sent, + # or sendmsg() say that only part of requested data is sent. + # This will make code more complex. Instead, we assume that + # unsent part of self._buffer will be sent on next event loop + # iteration, even if one can be sent right now. + # According to benchmarking, this minor misoptimization does + # not hurt much. + buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) + else: + buffers = self._buffer + n = self._sock.sendmsg(buffers) else: - buffers = self._buffer - n = self._sock.sendmsg(buffers) + if len(self._buffer) > 1: + # 1. Flattening buffers here in order not to copy memory + # everytime chunk is added. + # 2. Also it is possible to remove flattening, and send just + # first chunk, but this will be probably slower. + data = compat.flatten_list_bytes(self._buffer) + # self._buffer_size is not changed in that operation + self._buffer.clear() + self._buffer.append(data) + n = self._sock.send(self._buffer[0]) except (BlockingIOError, InterruptedError): return except Exception as exc: diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py index c885d20b..fc7dbecf 100644 --- a/tests/test_selector_events.py +++ b/tests/test_selector_events.py @@ -21,6 +21,7 @@ MOCK_ANY = mock.ANY +HAS_SENDMSG = hasattr(socket.socket, 'sendmsg') class TestBaseSelectorEventLoop(BaseSelectorEventLoop): @@ -1040,25 +1041,33 @@ def test_write_closing(self): def test_write_ready(self): data = b'data' - self.sock.sendmsg.return_value = len(data) + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = len(data) transport = self.socket_transport() sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.sendmsg.called) + self.assertTrue(sendfun.called) self.assertFalse(self.loop.writers) def test_write_ready_closing(self): data = b'data' - self.sock.sendmsg.return_value = len(data) + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = len(data) transport = self.socket_transport() transport._closing = True sock_prefill_buffer(transport, data) self.loop.add_writer(7, transport._write_ready) transport._write_ready() - self.assertTrue(self.sock.sendmsg.called) + self.assertTrue(sendfun.called) self.assertFalse(self.loop.writers) self.sock.close.assert_called_with() self.protocol.connection_lost.assert_called_with(None) @@ -1070,7 +1079,11 @@ def test_write_ready_no_data(self): def test_write_ready_partial(self): data = b'data' - self.sock.sendmsg.return_value = 2 + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = 2 transport = self.socket_transport() sock_prefill_buffer(transport, data) @@ -1081,7 +1094,11 @@ def test_write_ready_partial(self): def test_write_ready_partial_none(self): data = b'data' - self.sock.sendmsg.return_value = 0 + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.return_value = 0 transport = self.socket_transport() sock_prefill_buffer(transport, data) @@ -1091,7 +1108,11 @@ def test_write_ready_partial_none(self): self.assertSequenceEqual([b'data'], transport._buffer) def test_write_ready_tryagain(self): - self.sock.sendmsg.side_effect = BlockingIOError + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + sendfun.side_effect = BlockingIOError transport = self.socket_transport() sock_prefill_buffer(transport, b'data1') @@ -1100,10 +1121,17 @@ def test_write_ready_tryagain(self): transport._write_ready() self.loop.assert_writer(7, transport._write_ready) - self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) + if HAS_SENDMSG: + self.assertSequenceEqual([b'data1', b'data2'], transport._buffer) + else: + self.assertSequenceEqual([b'data1data2'], transport._buffer) def test_write_ready_exception(self): - err = self.sock.sendmsg.side_effect = OSError() + if HAS_SENDMSG: + sendfun = self.sock.sendmsg + else: + sendfun = self.sock.send + err = sendfun.side_effect = OSError() transport = self.socket_transport() transport._fatal_error = mock.Mock() @@ -1124,15 +1152,23 @@ def test_write_eof(self): def test_write_eof_buffer(self): tr = self.socket_transport() - self.sock.send.side_effect = BlockingIOError + if HAS_SENDMSG: + sendfun1 = self.sock.send + sendfun2 = self.sock.sendmsg + else: + sendfun1 = self.sock.send + sendfun2 = self.sock.send + sendfun1.side_effect = BlockingIOError + tr.write(b'data') tr.write_eof() self.assertSequenceEqual([b'data'], tr._buffer) self.assertTrue(tr._eof) self.assertFalse(self.sock.shutdown.called) - self.sock.sendmsg.side_effect = lambda _: 4 + + sendfun2.side_effect = lambda _: 4 tr._write_ready() - self.assertTrue(self.sock.sendmsg.called) + self.assertTrue(sendfun2.called) self.sock.shutdown.assert_called_with(socket.SHUT_WR) tr.close()