From c6a0179de45587f3bc2c178533e9e8563db7361f Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sat, 9 May 2020 13:12:43 -0500 Subject: [PATCH 1/2] forcely add UV_HANDLE_READABLE on pipe_t * in order to detect peer close on O_WRONLY pipe_t * partially reverted d8fe153 * refs libuv/libuv#2058 * refs #317 * fixes #311, fixes #312 --- tests/test_pipes.py | 30 +++++++++++++++++++ uvloop/handles/pipe.pxd | 4 --- uvloop/handles/pipe.pyx | 66 ++--------------------------------------- uvloop/includes/uv.pxd | 10 +++++++ 4 files changed, 43 insertions(+), 67 deletions(-) diff --git a/tests/test_pipes.py b/tests/test_pipes.py index 7d332cb7..c2b8a016 100644 --- a/tests/test_pipes.py +++ b/tests/test_pipes.py @@ -43,6 +43,7 @@ def connection_lost(self, exc): class MyWritePipeProto(asyncio.BaseProtocol): done = None + paused = False def __init__(self, loop=None): self.state = 'INITIAL' @@ -61,6 +62,12 @@ def connection_lost(self, exc): if self.done: self.done.set_result(None) + def pause_writing(self): + self.paused = True + + def resume_writing(self): + self.paused = False + class _BasePipeTest: def test_read_pipe(self): @@ -241,6 +248,29 @@ def reader(data): self.loop.run_until_complete(proto.done) self.assertEqual('CLOSED', proto.state) + def test_write_buffer_full(self): + rpipe, wpipe = os.pipe() + pipeobj = io.open(wpipe, 'wb', 1024) + + proto = MyWritePipeProto(loop=self.loop) + connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) + transport, p = self.loop.run_until_complete(connect) + self.assertIs(p, proto) + self.assertIs(transport, proto.transport) + self.assertEqual('CONNECTED', proto.state) + + for i in range(32): + transport.write(b'x' * 32768) + if proto.paused: + transport.write(b'x' * 32768) + break + else: + self.fail("Didn't reach a full buffer") + + os.close(rpipe) + self.loop.run_until_complete(asyncio.wait_for(proto.done, 1)) + self.assertEqual('CLOSED', proto.state) + class Test_UV_Pipes(_BasePipeTest, tb.UVTestCase): pass diff --git a/uvloop/handles/pipe.pxd b/uvloop/handles/pipe.pxd index f5dc1a93..7c60fc62 100644 --- a/uvloop/handles/pipe.pxd +++ b/uvloop/handles/pipe.pxd @@ -28,10 +28,6 @@ cdef class ReadUnixTransport(UVStream): cdef class WriteUnixTransport(UVStream): - cdef: - uv.uv_poll_t disconnect_listener - bint disconnect_listener_inited - @staticmethod cdef WriteUnixTransport new(Loop loop, object protocol, Server server, object waiter) diff --git a/uvloop/handles/pipe.pyx b/uvloop/handles/pipe.pyx index 581554f4..bd8809a0 100644 --- a/uvloop/handles/pipe.pyx +++ b/uvloop/handles/pipe.pyx @@ -12,6 +12,9 @@ cdef __pipe_init_uv_handle(UVStream handle, Loop loop): err = uv.uv_pipe_init(handle._loop.uvloop, handle._handle, 0) + # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe + # even if it is O_WRONLY, see also #317, libuv/libuv#2058 + handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE if err < 0: handle._abort_init() raise convert_error(err) @@ -147,10 +150,6 @@ cdef class ReadUnixTransport(UVStream): @cython.no_gc_clear cdef class WriteUnixTransport(UVStream): - def __cinit__(self): - self.disconnect_listener_inited = False - self.disconnect_listener.data = NULL - @staticmethod cdef WriteUnixTransport new(Loop loop, object protocol, Server server, object waiter): @@ -167,46 +166,6 @@ cdef class WriteUnixTransport(UVStream): __pipe_init_uv_handle(handle, loop) return handle - cdef _start_reading(self): - # A custom implementation for monitoring for EOF: - # libuv since v1.23.1 prohibits using uv_read_start on - # write-only FDs, so we use a throw-away uv_poll_t handle - # for that purpose, as suggested in - # https://github.com/libuv/libuv/issues/2058. - - cdef int err - - if not self.disconnect_listener_inited: - err = uv.uv_poll_init(self._loop.uvloop, - &self.disconnect_listener, - self._fileno()) - if err < 0: - raise convert_error(err) - self.disconnect_listener.data = self - self.disconnect_listener_inited = True - - err = uv.uv_poll_start(&self.disconnect_listener, - uv.UV_READABLE | uv.UV_DISCONNECT, - __on_write_pipe_poll_event) - if err < 0: - raise convert_error(err) - - cdef _stop_reading(self): - cdef int err - if not self.disconnect_listener_inited: - return - err = uv.uv_poll_stop(&self.disconnect_listener) - if err < 0: - raise convert_error(err) - - cdef _close(self): - if self.disconnect_listener_inited: - self.disconnect_listener.data = NULL - uv.uv_close((&self.disconnect_listener), NULL) - self.disconnect_listener_inited = False - - UVStream._close(self) - cdef _new_socket(self): return __pipe_get_socket(self) @@ -220,25 +179,6 @@ cdef class WriteUnixTransport(UVStream): raise NotImplementedError -cdef void __on_write_pipe_poll_event(uv.uv_poll_t* handle, - int status, int events) with gil: - cdef WriteUnixTransport tr - - if handle.data is NULL: - return - - tr = handle.data - if tr._closed: - return - - if events & uv.UV_DISCONNECT: - try: - tr._stop_reading() - tr._on_eof() - except BaseException as ex: - tr._fatal_error(ex, False) - - cdef class _PipeConnectRequest(UVRequest): cdef: UnixTransport transport diff --git a/uvloop/includes/uv.pxd b/uvloop/includes/uv.pxd index cfb2be4a..0b0f1da9 100644 --- a/uvloop/includes/uv.pxd +++ b/uvloop/includes/uv.pxd @@ -3,6 +3,15 @@ from posix.types cimport gid_t, uid_t from . cimport system +# This is an internal enum UV_HANDLE_READABLE from uv-common.h, used only by +# handles/pipe.pyx to temporarily workaround a libuv issue libuv/libuv#2058, +# before there is a proper fix in libuv. In short, libuv disallowed feeding a +# write-only pipe to uv_read_start(), which was needed by uvloop to detect a +# broken pipe without having to send anything on the write-only end. We're +# setting UV_HANDLE_READABLE on pipe_t to workaround this limitation +# temporarily, please see also #317. +cdef enum: + UV_INTERNAL_HANDLE_READABLE = 0x00004000 cdef extern from "uv.h" nogil: cdef int UV_TCP_IPV6ONLY @@ -82,6 +91,7 @@ cdef extern from "uv.h" nogil: ctypedef struct uv_handle_t: void* data uv_loop_t* loop + unsigned int flags # ... ctypedef struct uv_idle_t: From 88f9712c5c9890422a12883f4745cbfb5d503df4 Mon Sep 17 00:00:00 2001 From: Pierre Tardy Date: Wed, 12 Feb 2020 18:09:33 +0100 Subject: [PATCH 2/2] Add failing unit tests to reproduce issue #312 * closes #313 --- tests/test_process.py | 49 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/test_process.py b/tests/test_process.py index 09d5a508..e2d3269b 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -658,6 +658,55 @@ async def test_subprocess(): loop.run_until_complete(test_subprocess()) + def test_write_huge_stdin_8192(self): + self._test_write_huge_stdin(8192) + + def test_write_huge_stdin_8193(self): + self._test_write_huge_stdin(8193) + + def test_write_huge_stdin_219263(self): + self._test_write_huge_stdin(219263) + + def test_write_huge_stdin_219264(self): + self._test_write_huge_stdin(219264) + + def _test_write_huge_stdin(self, buf_size): + code = ''' +import sys +n = 0 +while True: + line = sys.stdin.readline() + if not line: + print("unexpected EOF", file=sys.stderr) + break + if line == "END\\n": + break + n+=1 +print(n)''' + num_lines = buf_size - len(b"END\n") + args = [sys.executable, b'-W', b'ignore', b'-c', code] + + async def test(): + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE) + data = b"\n" * num_lines + b"END\n" + self.assertEqual(len(data), buf_size) + proc.stdin.write(data) + await asyncio.wait_for(proc.stdin.drain(), timeout=1.0) + try: + await asyncio.wait_for(proc.wait(), timeout=1.0) + except asyncio.TimeoutError: + proc.kill() + proc.stdin.close() + await proc.wait() + raise + out = await proc.stdout.read() + self.assertEqual(int(out), num_lines) + + self.loop.run_until_complete(test()) + class Test_UV_Process(_TestProcess, tb.UVTestCase):