-
-
Notifications
You must be signed in to change notification settings - Fork 184
Fix #338 os.writev() instead of os.write(b.join()) and sendmsg() instead of send() #339
Changes from all commits
f874fff
b11a687
906e763
677769d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| import collections | ||
| import errno | ||
| import functools | ||
| import itertools | ||
| import socket | ||
| import warnings | ||
| try: | ||
|
|
@@ -502,7 +503,8 @@ class _SelectorTransport(transports._FlowControlMixin, | |
|
|
||
| max_size = 256 * 1024 # Buffer size passed to recv(). | ||
|
|
||
| _buffer_factory = bytearray # Constructs initial value for self._buffer. | ||
| # Constructs initial value for self._buffer. | ||
| _buffer_factory = collections.deque | ||
|
|
||
| # Attribute used in the destructor: it must be set even if the constructor | ||
| # is not called (see _SelectorSslTransport which may start by raising an | ||
|
|
@@ -524,6 +526,7 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): | |
| self._protocol_connected = True | ||
| self._server = server | ||
| self._buffer = self._buffer_factory() | ||
| self._buffer_size = 0 # Cached count of bytes in buffer | ||
| self._conn_lost = 0 # Set when call to connection_lost scheduled. | ||
| self._closing = False # Set when close() called. | ||
| if self._server is not None: | ||
|
|
@@ -569,6 +572,7 @@ def close(self): | |
| self._closing = True | ||
| self._loop.remove_reader(self._sock_fd) | ||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| self._conn_lost += 1 | ||
| self._loop.remove_writer(self._sock_fd) | ||
| self._loop.call_soon(self._call_connection_lost, None) | ||
|
|
@@ -600,7 +604,9 @@ def _force_close(self, exc): | |
| if self._conn_lost: | ||
| return | ||
| if self._buffer: | ||
| assert self._buffer_size > 0 | ||
| self._buffer.clear() | ||
| self._buffer_size = 0 | ||
| self._loop.remove_writer(self._sock_fd) | ||
| if not self._closing: | ||
| self._closing = True | ||
|
|
@@ -623,7 +629,7 @@ def _call_connection_lost(self, exc): | |
| self._server = None | ||
|
|
||
| def get_write_buffer_size(self): | ||
| return len(self._buffer) | ||
| return self._buffer_size | ||
|
|
||
|
|
||
| class _SelectorSocketTransport(_SelectorTransport): | ||
|
|
@@ -703,6 +709,7 @@ def write(self, data): | |
| return | ||
|
|
||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| # Optimization: try to send now. | ||
| try: | ||
| n = self._sock.send(data) | ||
|
|
@@ -712,45 +719,106 @@ def write(self, data): | |
| self._fatal_error(exc, 'Fatal write error on socket transport') | ||
| return | ||
| else: | ||
| data = data[n:] | ||
| if not data: | ||
| if n == len(data): | ||
| return | ||
| if n > 0: | ||
| # memoryview is required to eliminate memory copying while | ||
| # slicing. "unused" memory will be freed when this chunk is | ||
| # completely written | ||
| if not isinstance(data, memoryview): | ||
| data = memoryview(data) | ||
| data = data[n:] | ||
|
|
||
| # Not all was written; register write handler. | ||
| self._loop.add_writer(self._sock_fd, self._write_ready) | ||
|
|
||
| # Add it to the buffer. | ||
| self._buffer.extend(data) | ||
| self._buffer.append(data) | ||
| self._buffer_size += len(data) | ||
| self._maybe_pause_protocol() | ||
|
|
||
| def _write_ready(self): | ||
| assert self._buffer, 'Data should not be empty' | ||
| assert self._buffer and self._buffer_size > 0, 'Data must not be empty' | ||
|
|
||
| if self._conn_lost: | ||
| return | ||
| try: | ||
| n = self._sock.send(self._buffer) | ||
| if compat.HAS_SENDMSG: | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HAS or HAVE ? :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HAS is better. |
||
| # Note, this is not length in bytes, it is count of chunks in | ||
| # buffer | ||
| if len(self._buffer) > compat.SC_IOV_MAX: | ||
| # In order to minimize syscalls used for IO, we should call | ||
| # sendmsg() again and again until self._buffer is sent, | ||
| # or sendmsg() say that only part of requested data is sent. | ||
| # This will make code more complex. Instead, we assume that | ||
| # unsent part of self._buffer will be sent on next event loop | ||
| # iteration, even if one can be sent right now. | ||
| # According to benchmarking, this minor misoptimization does | ||
| # not hurt much. | ||
| buffers = itertools.islice(self._buffer, 0, compat.SC_IOV_MAX) | ||
| else: | ||
| buffers = self._buffer | ||
| n = self._sock.sendmsg(buffers) | ||
| else: | ||
| if len(self._buffer) > 1: | ||
| # 1. Flattening buffers here in order not to copy memory | ||
| # everytime chunk is added. | ||
| # 2. Also it is possible to remove flattening, and send just | ||
| # first chunk, but this will be probably slower. | ||
| data = compat.flatten_list_bytes(self._buffer) | ||
| # self._buffer_size is not changed in that operation | ||
| self._buffer.clear() | ||
| self._buffer.append(data) | ||
| n = self._sock.send(self._buffer[0]) | ||
| except (BlockingIOError, InterruptedError): | ||
| pass | ||
| return | ||
| except Exception as exc: | ||
| self._loop.remove_writer(self._sock_fd) | ||
| self._buffer.clear() | ||
| self._buffer_size = 0 | ||
| self._fatal_error(exc, 'Fatal write error on socket transport') | ||
| else: | ||
| if n: | ||
| del self._buffer[:n] | ||
| self._maybe_resume_protocol() # May append to buffer. | ||
| if not self._buffer: | ||
| self._loop.remove_writer(self._sock_fd) | ||
| if self._closing: | ||
| self._call_connection_lost(None) | ||
| elif self._eof: | ||
| self._sock.shutdown(socket.SHUT_WR) | ||
| return | ||
|
|
||
| assert 0 <= n <= self._buffer_size | ||
|
|
||
| while n > 0: | ||
| chunk = self._buffer.popleft() | ||
| chunk_len = len(chunk) | ||
| self._buffer_size -= chunk_len | ||
| n -= chunk_len | ||
|
|
||
| if n < 0: | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case is covered by |
||
| # only part of chunk was written, so push unread part of it | ||
| # back to _buffer. | ||
| # memoryview is required to eliminate memory copying while | ||
| # slicing. "unused" memory will be freed when this chunk is | ||
| # completely written | ||
| if not isinstance(chunk, memoryview): | ||
| chunk = memoryview(chunk) | ||
| chunk = chunk[n:] | ||
| self._buffer.appendleft(chunk) | ||
| self._buffer_size += len(chunk) | ||
|
|
||
| self._maybe_resume_protocol() # May append to buffer. | ||
|
|
||
| if self._buffer: | ||
| assert self._buffer_size > 0 | ||
| return | ||
|
|
||
| assert self._buffer_size == 0 | ||
|
|
||
| self._loop.remove_writer(self._sock_fd) | ||
| if self._closing: | ||
| self._call_connection_lost(None) | ||
| elif self._eof: | ||
| self._sock.shutdown(socket.SHUT_WR) | ||
|
|
||
| def write_eof(self): | ||
| if self._eof: | ||
| return | ||
| self._eof = True | ||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| self._sock.shutdown(socket.SHUT_WR) | ||
|
|
||
| def can_write_eof(self): | ||
|
|
@@ -759,6 +827,8 @@ def can_write_eof(self): | |
|
|
||
| class _SelectorSslTransport(_SelectorTransport): | ||
|
|
||
| # Unfortunatelly, SSLSocket does not provide neither sendmsg() nor writev() | ||
| # TODO: benchmark merging buffers in SSL socket as opposed to in Python | ||
| _buffer_factory = bytearray | ||
|
|
||
| def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None, | ||
|
|
@@ -904,6 +974,7 @@ def _read_ready(self): | |
| self._write_ready() | ||
|
|
||
| if self._buffer: | ||
| assert self._buffer_size > 0 | ||
| self._loop.add_writer(self._sock_fd, self._write_ready) | ||
|
|
||
| try: | ||
|
|
@@ -941,6 +1012,7 @@ def _write_ready(self): | |
| self._loop.add_reader(self._sock_fd, self._read_ready) | ||
|
|
||
| if self._buffer: | ||
| assert self._buffer_size > 0 | ||
| try: | ||
| n = self._sock.send(self._buffer) | ||
| except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError): | ||
|
|
@@ -952,15 +1024,20 @@ def _write_ready(self): | |
| except Exception as exc: | ||
| self._loop.remove_writer(self._sock_fd) | ||
| self._buffer.clear() | ||
| self._buffer_size = 0 | ||
| self._fatal_error(exc, 'Fatal write error on SSL transport') | ||
| return | ||
|
|
||
| assert 0 <= n <= self._buffer_size | ||
|
|
||
| if n: | ||
| del self._buffer[:n] | ||
| self._buffer_size -= n | ||
|
|
||
| self._maybe_resume_protocol() # May append to buffer. | ||
|
|
||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| self._loop.remove_writer(self._sock_fd) | ||
| if self._closing: | ||
| self._call_connection_lost(None) | ||
|
|
@@ -979,10 +1056,12 @@ def write(self, data): | |
| return | ||
|
|
||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| self._loop.add_writer(self._sock_fd, self._write_ready) | ||
|
|
||
| # Add it to the buffer. | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know why this requirements appear. So I think twice, do some checks and decide to remove it in order to eliminate extra data copying.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since caller may change data. For example: # Avoid copy as possible
buf = bytearray(4096)
view = memoryview(buf)
while True:
n = file.readinto(buf)
if n == 0:
break
transport.send(view[:n])
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And https://github.com/python/asyncio/pull/339/files#r64995800
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems, code above should not be used. It is never guaranteed to work. Passing mutable objects to external functions is dangerous. Since we use internal buferring, we can eliminate copying only by stealing buffers. i.e. to avoid any copying, user should write: # Avoid copy as possible
while True:
view = memoryview(bytearray(4096))
n = file.readinto(view)
if n == 0:
break
transport.send(view[:n])(did not check that code, but it should work) This example may be included in documentation.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree. There should be one copy at least.
I can't agree.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @1st1 How do you think about this? I still prefer bytearray approach. When performance is important, there is uvloop now.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm away from home (at EuroPython). Will read this discussion once I'm at home, in two-three days.
We need to find a balance between complexity of code in asyncio & performance. Having uvloop around shouldn't stop us from making asyncio fast too. |
||
| self._buffer.extend(data) | ||
| self._buffer_size += len(data) | ||
| self._maybe_pause_protocol() | ||
|
|
||
| def can_write_eof(self): | ||
|
|
@@ -991,8 +1070,6 @@ def can_write_eof(self): | |
|
|
||
| class _SelectorDatagramTransport(_SelectorTransport): | ||
|
|
||
| _buffer_factory = collections.deque | ||
|
|
||
| def __init__(self, loop, sock, protocol, address=None, | ||
| waiter=None, extra=None): | ||
| super().__init__(loop, sock, protocol, extra) | ||
|
|
@@ -1006,9 +1083,6 @@ def __init__(self, loop, sock, protocol, address=None, | |
| self._loop.call_soon(futures._set_result_unless_cancelled, | ||
| waiter, None) | ||
|
|
||
| def get_write_buffer_size(self): | ||
| return sum(len(data) for data, _ in self._buffer) | ||
|
|
||
| def _read_ready(self): | ||
| if self._conn_lost: | ||
| return | ||
|
|
@@ -1041,6 +1115,7 @@ def sendto(self, data, addr=None): | |
| return | ||
|
|
||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| # Attempt to send it right away first. | ||
| try: | ||
| if self._address: | ||
|
|
@@ -1058,20 +1133,26 @@ def sendto(self, data, addr=None): | |
| 'Fatal write error on datagram transport') | ||
| return | ||
|
|
||
| # Ensure that what we buffer is immutable. | ||
| self._buffer.append((bytes(data), addr)) | ||
| self._buffer.append((data, addr)) | ||
| self._buffer_size += len(data) | ||
| self._maybe_pause_protocol() | ||
|
|
||
| def _sendto_ready(self): | ||
| # Python 3.5 does not exports sendmmsg() syscall | ||
| # http://bugs.python.org/issue27022 | ||
| # So we use own loop instead of in-kernel one. | ||
| while self._buffer: | ||
| assert self._buffer_size > 0 | ||
| data, addr = self._buffer.popleft() | ||
| self._buffer_size -= len(data) | ||
| try: | ||
| if self._address: | ||
| self._sock.send(data) | ||
| else: | ||
| self._sock.sendto(data, addr) | ||
| except (BlockingIOError, InterruptedError): | ||
| self._buffer.appendleft((data, addr)) # Try again later. | ||
| self._buffer_size += len(data) | ||
| break | ||
| except OSError as exc: | ||
| self._protocol.error_received(exc) | ||
|
|
@@ -1083,6 +1164,7 @@ def _sendto_ready(self): | |
|
|
||
| self._maybe_resume_protocol() # May append to buffer. | ||
| if not self._buffer: | ||
| assert self._buffer_size == 0 | ||
| self._loop.remove_writer(self._sock_fd) | ||
| if self._closing: | ||
| self._call_connection_lost(None) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,7 +87,19 @@ def write(self, data): | |
|
|
||
| This does not block; it buffers the data and arranges for it | ||
| to be sent out asynchronously. | ||
|
|
||
| If passed argument is bytearray, it is not allowed to change the size | ||
| of that object while write is in progress. Violation of that lead | ||
| to undefined behaviour. | ||
| """ | ||
|
|
||
| # In order to lock size of bytearray, we may wrap bytearray to | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @1st1 @gvanrossum
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My opinion on this: The performance impact is negligible (since
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, it degrades performance....about 50-60 MB/sec on my PC.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 50-60 MB/sec relative to what? And what's benchmark? If it's the one that makes 5000 writes in a row of course it will run a tad slower, but that's a very unrealistic scenario.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even And so, we should convert to If flattening to Why we should support idiotic code? Modifying still-not-written buffers was never allowed, so if one rely on specific undocumented implementation, he must suffer.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Wow, I was absolutely under impression that it would. Maybe
Probably we shouldn't care. If
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See in action:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gvanrossum Do you know info about https://www.python.org/dev/peps/pep-0351 ? http://bugs.python.org/issue27070 says that it was rejected. This PEP is exactly what I want.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Using single bytearray instead of list (or deque) of memoryview for buffer can allow such use cases, too. Unless deque[Union[memoryview, bytes, bytearray]] is bit faster, I prefer bytearray for it's safety and simplicity.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @methane (editted to show only difference between |
||
| # memoryview, but this affects performance. Changing size of passed | ||
| # buffer after calling write() is very rare, obscure and suspicious | ||
| # logic, and even wrapping to memoryview will not help, since exception | ||
| # in user code will be raised when user changes size of passed | ||
| # bytearray. | ||
|
|
||
| raise NotImplementedError | ||
|
|
||
| def writelines(self, list_of_data): | ||
|
|
@@ -140,7 +152,7 @@ class Transport(ReadTransport, WriteTransport): | |
| connection_made() method, passing it the transport. | ||
|
|
||
| The implementation here raises NotImplemented for every method | ||
| except writelines(), which calls write() in a loop. | ||
| except writelines(). | ||
| """ | ||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to appveyor checking ( 0788e04 ), Windows build was broken:
https://ci.appveyor.com/project/1st1/asyncio/build/1.0.5/job/m2humsxc1tp1jxya
Now fixed