diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 4167e23364..6ed76b0c42 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -24,7 +24,7 @@ create complex transport configurations. Here's some examples: speak SSL over the network is to wrap an :class:`~trio.ssl.SSLStream` around a :class:`~trio.SocketStream`. -* If you spawn a subprocess then you can get a +* If you spawn a :ref:`subprocess`, you can get a :class:`~trio.abc.SendStream` that lets you write to its stdin, and a :class:`~trio.abc.ReceiveStream` that lets you read from its stdout. If for some reason you wanted to speak SSL to a subprocess, @@ -36,9 +36,6 @@ create complex transport configurations. Here's some examples: ssl_context.check_hostname = False s = SSLStream(StapledStream(process.stdin, process.stdout), ssl_context) - [Note: subprocess support is not implemented yet, but that's the - plan. Unless it is implemented, and I forgot to remove this note.] - * It sometimes happens that you want to connect to an HTTPS server, but you have to go through a web proxy... and the proxy also uses HTTPS. So you end up having to do `SSL-on-top-of-SSL @@ -641,10 +638,169 @@ Asynchronous file objects The underlying synchronous file object. -Subprocesses ------------- +.. module:: trio.subprocess +.. _subprocess: + +Spawning subprocesses with :mod:`trio.subprocess` +------------------------------------------------- + +The :mod:`trio.subprocess` module provides support for spawning +other programs, communicating with them via pipes, sending them signals, +and waiting for them to exit. Its interface is based on the +:mod:`subprocess` module in the standard library; differences +are noted below. + +The constants and exceptions from the standard :mod:`subprocess` +module are re-exported by :mod:`trio.subprocess` unchanged. +So, if you like, you can say ``from trio import subprocess`` +and continue referring to ``subprocess.PIPE``, +:exc:`subprocess.CalledProcessError`, and so on, in the same +way you would in synchronous code. + + +.. _subprocess-options: + +Options for starting subprocesses +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The standard :mod:`subprocess` module supports a dizzying array +of `options `__ +for controlling the environment in which a process starts and the +mechanisms used for communicating with it. (If you find that list +overwhelming, you're not alone; you might prefer to start with +just the `frequently used ones +`__.) + +Trio makes use of the :mod:`subprocess` module's logic for spawning processes, +so almost all of these options can be used with their same semantics when +starting subprocesses under Trio. The exceptions are ``encoding``, ``errors``, +``universal_newlines`` (and its 3.7+ alias ``text``), and ``bufsize``; +Trio always uses unbuffered byte streams for communicating with a process, +so these options don't make sense. Text I/O should use a layer +on top of the raw byte streams, just as it does with sockets. +[This layer does not yet exist, but is in the works.] + + +Running a process and waiting for it to finish +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +We're `working on ` +figuring out the best API for common higher-level subprocess operations. +In the meantime, you can implement something like the standard library +:func:`subprocess.run` in terms of :class:`trio.subprocess.Process` +as follows:: + + async def run( + command, *, input=None, capture_output=False, **options + ): + if input is not None: + options['stdin'] = subprocess.PIPE + if capture_output: + options['stdout'] = options['stderr'] = subprocess.PIPE + + stdout_chunks = [] + stderr_chunks = [] + + async with trio.subprocess.Process(command, **options) as proc: + + async def feed_input(): + async with proc.stdin: + if input: + try: + await proc.stdin.send_all(input) + except trio.BrokenResourceError: + pass + + async def read_output(stream, chunks): + async with stream: + while True: + chunk = await stream.receive_some(32768) + if not chunk: + break + chunks.append(chunk) + + async with trio.open_nursery() as nursery: + if proc.stdin is not None: + nursery.start_soon(feed_input) + if proc.stdout is not None: + nursery.start_soon(read_output, proc.stdout, stdout_chunks) + if proc.stderr is not None: + nursery.start_soon(read_output, proc.stderr, stderr_chunks) + await proc.wait() + + stdout = b"".join(stdout_chunks) if proc.stdout is not None else None + stderr = b"".join(stderr_chunks) if proc.stderr is not None else None + + if proc.returncode: + raise subprocess.CalledProcessError( + proc.returncode, proc.args, output=stdout, stderr=stderr + ) + else: + return subprocess.CompletedProcess( + proc.args, proc.returncode, stdout, stderr + ) + + +Interacting with a process as it runs +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can spawn a subprocess by creating an instance of +:class:`trio.subprocess.Process` and then interact with it using its +:attr:`~trio.subprocess.Process.stdin`, +:attr:`~trio.subprocess.Process.stdout`, and/or +:attr:`~trio.subprocess.Process.stderr` streams. + +.. autoclass:: trio.subprocess.Process + :members: + -`Not implemented yet! `__ +Differences from the standard library +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +* All arguments to the constructor of + :class:`~trio.subprocess.Process`, except the command to run, must be + passed using keywords. + +* :func:`~subprocess.call`, :func:`~subprocess.check_call`, and + :func:`~subprocess.check_output` are not provided. + +* :meth:`~subprocess.Popen.communicate` is not provided as a method on + :class:`~trio.subprocess.Process` objects; use a higher-level + function instead, or write the loop yourself if + you have unusual needs. :meth:`~subprocess.Popen.communicate` has + quite unusual cancellation behavior in the standard library (on some + platforms it spawns a background thread which continues to read from + the child process even after the timeout has expired) and we wanted + to provide an interface with fewer surprises. + +* :meth:`~trio.subprocess.Process.wait` is an async function that does + not take a ``timeout`` argument; combine it with + :func:`~trio.fail_after` if you want a timeout. + +* Text I/O is not supported: you may not use the + :class:`~trio.subprocess.Process` constructor arguments + ``universal_newlines`` (or its 3.7+ alias ``text``), ``encoding``, + or ``errors``. + +* :attr:`~trio.subprocess.Process.stdin` is a :class:`~trio.abc.SendStream` and + :attr:`~trio.subprocess.Process.stdout` and :attr:`~trio.subprocess.Process.stderr` + are :class:`~trio.abc.ReceiveStream`\s, rather than file objects. The + :class:`~trio.subprocess.Process` constructor argument ``bufsize`` is + not supported since there would be no file object to pass it to. + +* :meth:`~trio.subprocess.Process.aclose` (and thus also + ``__aexit__``) behave like the standard :class:`~subprocess.Popen` + context manager exit (close pipes to the process, then wait for it + to exit), but add additional behavior if cancelled: kill the process + and wait for it to finish terminating. This is useful for scoping + the lifetime of a simple subprocess that doesn't spawn any children + of its own. (For subprocesses that do in turn spawn their own + subprocesses, there is not currently any way to clean up the whole + tree; moreover, using the :class:`Process` context manager in such + cases is likely to be counterproductive as killing the top-level + subprocess leaves it no chance to do any cleanup of its children + that might be desired. You'll probably want to write your own + supervision logic in that case.) Signals diff --git a/newsfragments/4.feature.rst b/newsfragments/4.feature.rst new file mode 100644 index 0000000000..0a34bf041d --- /dev/null +++ b/newsfragments/4.feature.rst @@ -0,0 +1,6 @@ +Initial :ref:`subprocess support `. +Add :class:`trio.subprocess.Process`, an async wrapper around the stdlib +:class:`subprocess.Popen` class, which permits spawning subprocesses +and communicating with them over standard Trio streams. +:mod:`trio.subprocess` also reexports all the stdlib :mod:`subprocess` +exceptions and constants for convenience. diff --git a/notes-to-self/subprocess-notes.txt b/notes-to-self/subprocess-notes.txt new file mode 100644 index 0000000000..e33c835640 --- /dev/null +++ b/notes-to-self/subprocess-notes.txt @@ -0,0 +1,73 @@ +# subprocesses are a huge hassle +# on Linux there is simply no way to async wait for a child to exit except by +# messing with SIGCHLD and that is ... *such* a mess. Not really +# tenable. We're better off trying os.waitpid(..., os.WNOHANG), and if that +# says the process is still going then spawn a thread to sit in waitpid. +# ......though that waitpid is non-cancellable so ugh. this is a problem, +# becaues it's also mutating -- you only get to waitpid() once, and you have +# to do it, because zombies. I guess we could make sure the waitpid thread is +# daemonic and either it gets back to us eventually (even if our first call to +# 'await wait()' is cancelled, maybe another one won't be), or else we go away +# and don't care anymore. +# I guess simplest is just to spawn a thread at the same time as we spawn the +# process, with more reasonable notification semantics. +# or we can poll every 100 ms or something, sigh. + +# on Mac/*BSD then kqueue works, go them. (maybe have WNOHANG after turning it +# on to avoid a race condition I guess) + +# on Windows, you can either do the thread thing, or something involving +# WaitForMultipleObjects, or the Job Object API: +# https://stackoverflow.com/questions/17724859/detecting-exit-failure-of-child-processes-using-iocp-c-windows +# (see also the comments here about using the Job Object API: +# https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108) +# however the docs say: +# "Note that, with the exception of limits set with the +# JobObjectNotificationLimitInformation information class, delivery of +# messages to the completion port is not guaranteed; failure of a message to +# arrive does not necessarily mean that the event did not occur" +# +# oh windows wtf + +# We'll probably want to mess with the job API anyway for worker processes +# (b/c that's the reliable way to make sure we never leave residual worker +# processes around after exiting, see that stackoverflow question again), so +# maybe this isn't too big a hassle? waitpid is probably easiest for the +# first-pass implementation though. + +# the handle version has the same issues as waitpid on Linux, except I guess +# that on windows the waitpid equivalent doesn't consume the handle. +# -- wait no, the windows equivalent takes a timeout! and we know our +# cancellation deadline going in, so that's actually okay. (Still need to use +# a thread but whatever.) + +# asyncio does RegisterWaitForSingleObject with a callback that does +# PostQueuedCompletionStatus. +# this is just a thread pool in disguise (and in principle could have weird +# problems if you have enough children and run out of threads) +# it's possible we could do something with a thread that just sits in +# an alertable state and handle callbacks...? though hmm, maybe the set of +# events that can notify via callbacks is equivalent to the set that can +# notify via IOCP. +# there's WaitForMultipleObjects to let multiple waits share a thread I +# guess. +# you can wake up a WaitForMultipleObjectsEx on-demand by using QueueUserAPC +# to send a no-op APC to its thread. +# this is also a way to cancel a WaitForSingleObjectEx, actually. So it +# actually is possible to cancel the equivalent of a waitpid on Windows. + +# Potentially useful observation: you *can* use a socket as the +# stdin/stdout/stderr for a child, iff you create that socket *without* +# WSA_FLAG_OVERLAPPED: +# http://stackoverflow.com/a/5725609 +# Here's ncm's Windows implementation of socketpair, which has a flag to +# control whether one of the sockets has WSA_FLAG_OVERLAPPED set: +# https://github.com/ncm/selectable-socketpair/blob/master/socketpair.c +# (it also uses listen(1) so it's robust against someone intercepting things, +# unlike the version in socket.py... not sure anyone really cares, but +# hey. OTOH it only supports AF_INET, while socket.py supports AF_INET6, +# fancy.) +# (or it would be trivial to (re)implement in python, using either +# socket.socketpair or ncm's version as a model, given a cffi function to +# create the non-overlapped socket in the first place then just pass it into +# the socket.socket constructor (avoiding the dup() that fromfd does).) diff --git a/trio/__init__.py b/trio/__init__.py index 5e14fa1f09..c7478c831b 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -67,6 +67,7 @@ from . import socket from . import abc from . import ssl +from . import subprocess # Not imported by default: testing if False: from . import testing diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 44d298c469..429ba9d9f7 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -19,6 +19,7 @@ from ._windows_cffi import ( ffi, kernel32, + ntdll, INVALID_HANDLE_VALUE, raise_winerror, ErrorCodes, @@ -64,7 +65,12 @@ # - when binding handles to the IOCP, we always set the completion key to 0. # when dispatching received events, when the completion key is 0 we dispatch # based on lpOverlapped -# - thread-safe wakeup uses completion key 1 +# - when we try to cancel an I/O operation and the cancellation fails, +# we post a completion with completion key 1; if this arrives before the +# real completion (with completion key 0) we assume the user forgot to +# call register_with_iocp on their handle, and raise an error accordingly +# (without this logic we'd hang forever uninterruptibly waiting for the +# completion that never arrives) # - other completion keys are available for user use # handles: @@ -126,9 +132,13 @@ def __init__(self): self._iocp_queue = deque() self._iocp_thread = None self._overlapped_waiters = {} + self._posted_too_late_to_cancel = set() self._completion_key_queues = {} - # Completion key 0 is reserved for regular IO events - self._completion_key_counter = itertools.count(1) + # Completion key 0 is reserved for regular IO events. + # Completion key 1 is used by the fallback post from a regular + # IO event's abort_fn to catch the user forgetting to call + # register_wiht_iocp. + self._completion_key_counter = itertools.count(2) # {stdlib socket object: task} # except that wakeup socket is mapped to None @@ -238,6 +248,44 @@ def do_select(): # Regular I/O event, dispatch on lpOverlapped waiter = self._overlapped_waiters.pop(entry.lpOverlapped) _core.reschedule(waiter) + elif entry.lpCompletionKey == 1: + # Post made by a regular I/O event's abort_fn + # after it failed to cancel the I/O. If we still + # have a waiter with this lpOverlapped, we didn't + # get the regular I/O completion and almost + # certainly the user forgot to call + # register_with_iocp. + self._posted_too_late_to_cancel.remove(entry.lpOverlapped) + try: + waiter = self._overlapped_waiters.pop( + entry.lpOverlapped + ) + except KeyError: + # Looks like the actual completion got here + # before this fallback post did -- we're in + # the "expected" case of too-late-to-cancel, + # where the user did nothing wrong and the + # main thread just got backlogged relative to + # the IOCP thread somehow. Nothing more to do. + pass + else: + exc = _core.TrioInternalError( + "Failed to cancel overlapped I/O in {} and didn't " + "receive the completion either. Did you forget to " + "call register_with_iocp()?".format(waiter.name) + ) + # Raising this out of handle_io ensures that + # the user will see our message even if some + # other task is in an uncancellable wait due + # to the same underlying forgot-to-register + # issue (if their CancelIoEx succeeds, we + # have no way of noticing that their completion + # won't arrive). Unfortunately it loses the + # task traceback. If you're debugging this + # error and can't tell where it's coming from, + # try changing this line to + # _core.reschedule(waiter, outcome.Error(exc)) + raise exc else: # dispatch on lpCompletionKey queue = self._completion_key_queues[entry.lpCompletionKey] @@ -289,13 +337,15 @@ def current_iocp(self): @_public def register_with_iocp(self, handle): - handle = _handle(obj) + handle = _handle(handle) # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx + # INVALID_PARAMETER seems to be used for both "can't register + # because not opened in OVERLAPPED mode" and "already registered" _check(kernel32.CreateIoCompletionPort(handle, self._iocp, 0, 0)) @_public async def wait_overlapped(self, handle, lpOverlapped): - handle = _handle(obj) + handle = _handle(handle) if isinstance(lpOverlapped, int): lpOverlapped = ffi.cast("LPOVERLAPPED", lpOverlapped) if lpOverlapped in self._overlapped_waiters: @@ -313,16 +363,60 @@ def abort(raise_cancel_): # possible -- the docs are pretty unclear. nonlocal raise_cancel raise_cancel = raise_cancel_ - _check(kernel32.CancelIoEx(handle, lpOverlapped)) + try: + _check(kernel32.CancelIoEx(handle, lpOverlapped)) + except OSError as exc: + if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: + # Too late to cancel. If this happens because the + # operation is already completed, we don't need to + # do anything; presumably the IOCP thread will be + # reporting back about that completion soon. But + # another possibility is that the operation was + # performed on a handle that wasn't registered + # with our IOCP (ie, the user forgot to call + # register_with_iocp), in which case we're just + # never going to see the completion. To avoid an + # uncancellable infinite sleep in the latter case, + # we'll PostQueuedCompletionStatus here, and if + # our post arrives before the original completion + # does, we'll assume the handle wasn't registered. + _check( + kernel32.PostQueuedCompletionStatus( + self._iocp, 0, 1, lpOverlapped + ) + ) + + # Keep the lpOverlapped referenced so its address + # doesn't get reused until our posted completion + # status has been processed. Otherwise, we can + # get confused about which completion goes with + # which I/O. + self._posted_too_late_to_cancel.add(lpOverlapped) + + else: # pragma: no cover + raise TrioInternalError( + "CancelIoEx failed with unexpected error" + ) from exc return _core.Abort.FAILED await _core.wait_task_rescheduled(abort) if lpOverlapped.Internal != 0: - if lpOverlapped.Internal == ErrorCodes.ERROR_OPERATION_ABORTED: - assert raise_cancel is not None - raise_cancel() + # the lpOverlapped reports the error as an NT status code, + # which we must convert back to a Win32 error code before + # it will produce the right sorts of exceptions + code = ntdll.RtlNtStatusToDosError(lpOverlapped.Internal) + if code == ErrorCodes.ERROR_OPERATION_ABORTED: + if raise_cancel is not None: + raise_cancel() + else: + # We didn't request this cancellation, so assume + # it happened due to the underlying handle being + # closed before the operation could complete. + raise _core.ClosedResourceError( + "another task closed this resource" + ) else: - raise_winerror(lpOverlapped.Internal) + raise_winerror(code) @_public @contextmanager @@ -372,21 +466,117 @@ def notify_socket_close(self, sock): ) _core.reschedule(task, outcome.Error(exc)) - # This has cffi-isms in it and is untested... but it demonstrates the - # logic we'll want when we start actually using overlapped I/O. - # - # @_public - # async def perform_overlapped(self, handle, submit_fn): - # # submit_fn(lpOverlapped) submits some I/O - # # it may raise an OSError with ERROR_IO_PENDING - # await _core.checkpoint_if_cancelled() - # self.register_with_iocp(handle) - # lpOverlapped = ffi.new("LPOVERLAPPED") - # try: - # submit_fn(lpOverlapped) - # except OSError as exc: - # if exc.winerror != Error.ERROR_IO_PENDING: - # await _core.cancel_shielded_checkpoint() - # raise - # await self.wait_overlapped(handle, lpOverlapped) - # return lpOverlapped + async def _perform_overlapped(self, handle, submit_fn): + # submit_fn(lpOverlapped) submits some I/O + # it may raise an OSError with ERROR_IO_PENDING + # the handle must already be registered using + # register_with_iocp(handle) + await _core.checkpoint_if_cancelled() + lpOverlapped = ffi.new("LPOVERLAPPED") + try: + submit_fn(lpOverlapped) + except OSError as exc: + if exc.winerror != ErrorCodes.ERROR_IO_PENDING: + await _core.cancel_shielded_checkpoint() + raise + await self.wait_overlapped(handle, lpOverlapped) + return lpOverlapped + + @_public + async def write_overlapped(self, handle, data, file_offset=0): + # Make sure we keep our buffer referenced until the I/O completes. + # For typical types of `data` (bytes, bytearray) the memory we + # pass is part of the existing allocation, but the buffer protocol + # allows for other possibilities. + cbuf = ffi.from_buffer(data) + + def submit_write(lpOverlapped): + # yes, these are the real documented names + offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME + offset_fields.Offset = file_offset & 0xffffffff + offset_fields.OffsetHigh = file_offset >> 32 + _check( + kernel32.WriteFile( + _handle(handle), + ffi.cast("LPCVOID", cbuf), + len(cbuf), + ffi.NULL, + lpOverlapped, + ) + ) + + try: + lpOverlapped = await self._perform_overlapped(handle, submit_write) + # this is "number of bytes transferred" + return lpOverlapped.InternalHigh + finally: + # There's a trap here. Let's say the incoming `data` is a + # memoryview, and our caller is bounding its lifetime with + # a context manager, maybe because they want to break a + # larger buffer into multiple writes without copying: + # + # with memoryview(big_buffer) as view: + # total_sent = 0 + # while total_sent < len(view): + # total_sent += await write_overlapped( + # handle, big_buffer[total_sent : total_sent + 8192] + # ) + # + # Our FFI buffer object holds a reference to the memoryview's + # buffer, and the memoryview knows about this. If the + # memoryview context is exited (equivalent to calling + # memoryview.release()) while that reference is still held, + # we get a BufferError. + # + # Unfortunately, there seems to be no way to drop that + # reference without destroying the FFI buffer object, + # alhough one might be coming soon. + # (https://bitbucket.org/cffi/cffi/issues/395/) + # We can't even call __del__, because there's no __del__ + # exposed. On CPython, when we return normally, the frame + # and its locals are destroyed, but when we throw an + # exception, they remain referenced by the traceback. + # So, we need to drop the reference to the FFI buffer + # explicitly when unwinding. + # + # This doesn't help with destruction on PyPy, but PyPy + # doesn't currently track buffer references in the same + # way as CPython does, so there's no need for a workaround + # there. + # + del cbuf + + @_public + async def readinto_overlapped(self, handle, buffer, file_offset=0): + # This will throw a reasonable error if `buffer` is read-only + # or doesn't support the buffer protocol, and perform no + # operation otherwise. A future release of CFFI will support + # ffi.from_buffer(foo, require_writable=True) to do the same + # thing less circumlocutiously. + # (https://bitbucket.org/cffi/cffi/issues/394/) + ffi.memmove(buffer, b"", 0) + + # As in write_overlapped, we want to ensure the buffer stays + # alive for the duration of the I/O. + cbuf = ffi.from_buffer(buffer) + + def submit_read(lpOverlapped): + offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME + offset_fields.Offset = file_offset & 0xffffffff + offset_fields.OffsetHigh = file_offset >> 32 + _check( + kernel32.ReadFile( + _handle(handle), + ffi.cast("LPVOID", cbuf), + len(cbuf), + ffi.NULL, + lpOverlapped, + ) + ) + + try: + lpOverlapped = await self._perform_overlapped(handle, submit_read) + return lpOverlapped.InternalHigh + finally: + # See discussion in write_overlapped() + del cbuf diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index 16dd9a232b..f05e871ce3 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -11,8 +11,12 @@ typedef PVOID HANDLE; typedef unsigned long DWORD; typedef unsigned long ULONG; +typedef unsigned int NTSTATUS; typedef unsigned long u_long; typedef ULONG *PULONG; +typedef const void *LPCVOID; +typedef void *LPVOID; +typedef const wchar_t *LPCWSTR; typedef uintptr_t ULONG_PTR; typedef uintptr_t UINT_PTR; @@ -53,6 +57,16 @@ _In_ DWORD NumberOfConcurrentThreads ); +HANDLE CreateFileW( + LPCWSTR lpFileName, + DWORD dwDesiredAccess, + DWORD dwShareMode, + LPSECURITY_ATTRIBUTES lpSecurityAttributes, + DWORD dwCreationDisposition, + DWORD dwFlagsAndAttributes, + HANDLE hTemplateFile +); + BOOL WINAPI CloseHandle( _In_ HANDLE hObject ); @@ -78,6 +92,22 @@ _In_opt_ LPOVERLAPPED lpOverlapped ); +BOOL WriteFile( + HANDLE hFile, + LPCVOID lpBuffer, + DWORD nNumberOfBytesToWrite, + LPDWORD lpNumberOfBytesWritten, + LPOVERLAPPED lpOverlapped +); + +BOOL ReadFile( + HANDLE hFile, + LPVOID lpBuffer, + DWORD nNumberOfBytesToRead, + LPDWORD lpNumberOfBytesRead, + LPOVERLAPPED lpOverlapped +); + BOOL WINAPI SetConsoleCtrlHandler( _In_opt_ void* HandlerRoutine, _In_ BOOL Add @@ -110,6 +140,10 @@ DWORD dwMilliseconds ); +ULONG RtlNtStatusToDosError( + NTSTATUS Status +); + """ # cribbed from pywincffi @@ -130,6 +164,7 @@ ffi.cdef(LIB) kernel32 = ffi.dlopen("kernel32.dll") +ntdll = ffi.dlopen("ntdll.dll") INVALID_HANDLE_VALUE = ffi.cast("HANDLE", -1) @@ -167,3 +202,18 @@ class ErrorCodes(enum.IntEnum): ERROR_OPERATION_ABORTED = 995 ERROR_ABANDONED_WAIT_0 = 735 ERROR_INVALID_HANDLE = 6 + ERROR_INVALID_PARMETER = 87 + ERROR_NOT_FOUND = 1168 + + +class FileFlags(enum.IntEnum): + GENERIC_READ = 0x80000000 + FILE_FLAG_OVERLAPPED = 0x40000000 + FILE_SHARE_READ = 1 + FILE_SHARE_WRITE = 2 + FILE_SHARE_DELETE = 4 + CREATE_NEW = 1 + CREATE_ALWAYS = 2 + OPEN_EXISTING = 3 + OPEN_ALWAYS = 4 + TRUNCATE_EXISTING = 5 diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index 18436c7704..42ee79060d 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -1,4 +1,6 @@ import os +import tempfile +from contextlib import contextmanager import pytest @@ -6,9 +8,13 @@ # Mark all the tests in this file as being windows-only pytestmark = pytest.mark.skipif(not on_windows, reason="windows only") -from ... import _core +from .tutil import slow, gc_collect_harder +from ... import _core, sleep, move_on_after +from ...testing import wait_all_tasks_blocked if on_windows: - from .._windows_cffi import ffi, kernel32 + from .._windows_cffi import ( + ffi, kernel32, INVALID_HANDLE_VALUE, raise_winerror, FileFlags + ) # The undocumented API that this is testing should be changed to stop using @@ -45,5 +51,133 @@ async def post(key): print("end loop") -# XX test setting the iomanager._iocp to something weird to make sure that the -# IOCP thread can send exceptions back to the main thread +async def test_readinto_overlapped(): + data = b"1" * 1024 + b"2" * 1024 + b"3" * 1024 + b"4" * 1024 + buffer = bytearray(len(data)) + + with tempfile.TemporaryDirectory() as tdir: + tfile = os.path.join(tdir, "numbers.txt") + with open(tfile, "wb") as fp: + fp.write(data) + fp.flush() + + rawname = tfile.encode("utf-16le") + b"\0\0" + rawname_buf = ffi.from_buffer(rawname) + handle = kernel32.CreateFileW( + ffi.cast("LPCWSTR", rawname_buf), + FileFlags.GENERIC_READ, + FileFlags.FILE_SHARE_READ, + ffi.NULL, # no security attributes + FileFlags.OPEN_EXISTING, + FileFlags.FILE_FLAG_OVERLAPPED, + ffi.NULL, # no template file + ) + if handle == INVALID_HANDLE_VALUE: # pragma: no cover + raise_winerror() + + try: + with memoryview(buffer) as buffer_view: + + async def read_region(start, end): + await _core.readinto_overlapped( + handle, + buffer_view[start:end], + start, + ) + + _core.register_with_iocp(handle) + async with _core.open_nursery() as nursery: + for start in range(0, 4096, 512): + nursery.start_soon(read_region, start, start + 512) + + assert buffer == data + + with pytest.raises(BufferError): + await _core.readinto_overlapped(handle, b"immutable") + finally: + kernel32.CloseHandle(handle) + + +@contextmanager +def pipe_with_overlapped_read(): + from asyncio.windows_utils import pipe + import msvcrt + + read_handle, write_handle = pipe(overlapped=(True, False)) + try: + write_fd = msvcrt.open_osfhandle(write_handle, 0) + yield os.fdopen(write_fd, "wb", closefd=False), read_handle + finally: + kernel32.CloseHandle(ffi.cast("HANDLE", read_handle)) + kernel32.CloseHandle(ffi.cast("HANDLE", write_handle)) + + +def test_forgot_to_register_with_iocp(): + with pipe_with_overlapped_read() as (write_fp, read_handle): + with write_fp: + write_fp.write(b"test\n") + + left_run_yet = False + + async def main(): + target = bytearray(1) + try: + async with _core.open_nursery() as nursery: + nursery.start_soon( + _core.readinto_overlapped, + read_handle, + target, + name="xyz" + ) + await wait_all_tasks_blocked() + nursery.cancel_scope.cancel() + finally: + # Run loop is exited without unwinding running tasks, so + # we don't get here until the main() coroutine is GC'ed + assert left_run_yet + + with pytest.raises(_core.TrioInternalError) as exc_info: + _core.run(main) + left_run_yet = True + assert "Failed to cancel overlapped I/O in xyz " in str(exc_info.value) + assert "forget to call register_with_iocp()?" in str(exc_info.value) + + # Make sure the Nursery.__del__ assertion about dangling children + # gets put with the correct test + del exc_info + gc_collect_harder() + + +@slow +async def test_too_late_to_cancel(): + import time + + with pipe_with_overlapped_read() as (write_fp, read_handle): + _core.register_with_iocp(read_handle) + target = bytearray(6) + async with _core.open_nursery() as nursery: + # Start an async read in the background + nursery.start_soon(_core.readinto_overlapped, read_handle, target) + await wait_all_tasks_blocked() + + # Synchronous write to the other end of the pipe + with write_fp: + write_fp.write(b"test1\ntest2\n") + + # Note: not trio.sleep! We're making sure the OS level + # ReadFile completes, before trio has a chance to execute + # another checkpoint and notice it completed. + time.sleep(1) + nursery.cancel_scope.cancel() + assert target[:6] == b"test1\n" + + # Do another I/O to make sure we've actually processed the + # fallback completion that was posted when CancelIoEx failed. + assert await _core.readinto_overlapped(read_handle, target) == 6 + assert target[:6] == b"test2\n" + + +# XX: test setting the iomanager._iocp to something weird to make +# sure that the IOCP thread can send exceptions back to the main thread. +# --> it's not clear if this is actually possible? we just get +# ERROR_INVALID_HANDLE which looks like the IOCP was closed (not an error) diff --git a/trio/_subprocess.py b/trio/_subprocess.py index e33c835640..b1ab84ffd2 100644 --- a/trio/_subprocess.py +++ b/trio/_subprocess.py @@ -1,73 +1,221 @@ -# subprocesses are a huge hassle -# on Linux there is simply no way to async wait for a child to exit except by -# messing with SIGCHLD and that is ... *such* a mess. Not really -# tenable. We're better off trying os.waitpid(..., os.WNOHANG), and if that -# says the process is still going then spawn a thread to sit in waitpid. -# ......though that waitpid is non-cancellable so ugh. this is a problem, -# becaues it's also mutating -- you only get to waitpid() once, and you have -# to do it, because zombies. I guess we could make sure the waitpid thread is -# daemonic and either it gets back to us eventually (even if our first call to -# 'await wait()' is cancelled, maybe another one won't be), or else we go away -# and don't care anymore. -# I guess simplest is just to spawn a thread at the same time as we spawn the -# process, with more reasonable notification semantics. -# or we can poll every 100 ms or something, sigh. - -# on Mac/*BSD then kqueue works, go them. (maybe have WNOHANG after turning it -# on to avoid a race condition I guess) - -# on Windows, you can either do the thread thing, or something involving -# WaitForMultipleObjects, or the Job Object API: -# https://stackoverflow.com/questions/17724859/detecting-exit-failure-of-child-processes-using-iocp-c-windows -# (see also the comments here about using the Job Object API: -# https://stackoverflow.com/questions/23434842/python-how-to-kill-child-processes-when-parent-dies/23587108#23587108) -# however the docs say: -# "Note that, with the exception of limits set with the -# JobObjectNotificationLimitInformation information class, delivery of -# messages to the completion port is not guaranteed; failure of a message to -# arrive does not necessarily mean that the event did not occur" -# -# oh windows wtf - -# We'll probably want to mess with the job API anyway for worker processes -# (b/c that's the reliable way to make sure we never leave residual worker -# processes around after exiting, see that stackoverflow question again), so -# maybe this isn't too big a hassle? waitpid is probably easiest for the -# first-pass implementation though. - -# the handle version has the same issues as waitpid on Linux, except I guess -# that on windows the waitpid equivalent doesn't consume the handle. -# -- wait no, the windows equivalent takes a timeout! and we know our -# cancellation deadline going in, so that's actually okay. (Still need to use -# a thread but whatever.) - -# asyncio does RegisterWaitForSingleObject with a callback that does -# PostQueuedCompletionStatus. -# this is just a thread pool in disguise (and in principle could have weird -# problems if you have enough children and run out of threads) -# it's possible we could do something with a thread that just sits in -# an alertable state and handle callbacks...? though hmm, maybe the set of -# events that can notify via callbacks is equivalent to the set that can -# notify via IOCP. -# there's WaitForMultipleObjects to let multiple waits share a thread I -# guess. -# you can wake up a WaitForMultipleObjectsEx on-demand by using QueueUserAPC -# to send a no-op APC to its thread. -# this is also a way to cancel a WaitForSingleObjectEx, actually. So it -# actually is possible to cancel the equivalent of a waitpid on Windows. - -# Potentially useful observation: you *can* use a socket as the -# stdin/stdout/stderr for a child, iff you create that socket *without* -# WSA_FLAG_OVERLAPPED: -# http://stackoverflow.com/a/5725609 -# Here's ncm's Windows implementation of socketpair, which has a flag to -# control whether one of the sockets has WSA_FLAG_OVERLAPPED set: -# https://github.com/ncm/selectable-socketpair/blob/master/socketpair.c -# (it also uses listen(1) so it's robust against someone intercepting things, -# unlike the version in socket.py... not sure anyone really cares, but -# hey. OTOH it only supports AF_INET, while socket.py supports AF_INET6, -# fancy.) -# (or it would be trivial to (re)implement in python, using either -# socket.socketpair or ncm's version as a model, given a cffi function to -# create the non-overlapped socket in the first place then just pass it into -# the socket.socket constructor (avoiding the dup() that fromfd does).) +import math +import os +import select +import subprocess +import sys + +from . import _core +from ._abc import AsyncResource +from ._sync import CapacityLimiter, Lock +from ._threads import run_sync_in_worker_thread +from ._subprocess_platform import ( + wait_child_exiting, create_pipe_to_child_stdin, + create_pipe_from_child_output +) + +__all__ = ["Process"] + + +class Process(AsyncResource): + """Execute a child program in a new process. + + Like :class:`subprocess.Popen`, but async. + + Constructing a :class:`Process` immediately spawns the child + process, or throws an :exc:`OSError` if the spawning fails (for + example, if the specified command could not be found). + After construction, you can interact with the child process + by writing data to its :attr:`stdin` stream (a + :class:`~trio.abc.SendStream`), reading data from its :attr:`stdout` + and/or :attr:`stderr` streams (both :class:`~trio.abc.ReceiveStream`\s), + sending it signals using :meth:`terminate`, :meth:`kill`, or + :meth:`send_signal`, and waiting for it to exit using :meth:`wait`. + + Each standard stream is only available if it was specified at + :class:`Process` construction time that a pipe should be created + for it. For example, if you constructed with + ``stdin=subprocess.PIPE``, you can write to the :attr:`stdin` + stream, else :attr:`stdin` will be ``None``. + + :class:`Process` implements :class:`~trio.abc.AsyncResource`, + so you can use it as an async context manager or call its + :meth:`aclose` method directly. "Closing" a :class:`Process` + will close any pipes to the child and wait for it to exit; + if cancelled, the child will be forcibly killed and we will + ensure it has finished exiting before allowing the cancellation + to propagate. It is *strongly recommended* that process lifetime + be scoped using an ``async with`` block wherever possible, to + avoid winding up with processes hanging around longer than you + were planning on. + + Args: + command (str or list): The command to run. Typically this is a + list of strings such as ``['ls', '-l', 'directory with spaces']``, + where the first element names the executable to invoke and the other + elements specify its arguments. If ``shell=True`` is given as + an option, ``command`` should be a single string like + ``"ls -l 'directory with spaces'"``, which will + split into words following the shell's quoting rules. + stdin: Specifies what the child process's standard input + stream should connect to: output written by the parent + (``subprocess.PIPE``), nothing (``subprocess.DEVNULL``), + or an open file (pass a file descriptor or something whose + ``fileno`` method returns one). If ``stdin`` is unspecified, + the child process will have the same standard input stream + as its parent. + stdout: Like ``stdin``, but for the child process's standard output + stream. + stderr: Like ``stdin``, but for the child process's standard error + stream. An additional value ``subprocess.STDOUT`` is supported, + which causes the child's standard output and standard error + messages to be intermixed on a single standard output stream, + attached to whatever the ``stdout`` option says to attach it to. + **options: Other :ref:`general subprocess options ` + are also accepted. + + Attributes: + args (str or list): The ``command`` passed at construction time, + speifying the process to execute and its arguments. + pid (int): The process ID of the child process managed by this object. + stdin (trio.abc.SendStream or None): A stream connected to the child's + standard input stream: when you write bytes here, they become available + for the child to read. Only available if the :class:`Process` + was constructed using ``stdin=PIPE``; otherwise this will be None. + stdout (trio.abc.ReceiveStream or None): A stream connected to + the child's standard output stream: when the child writes to + standard output, the written bytes become available for you + to read here. Only available if the :class:`Process` was + constructed using ``stdout=PIPE``; otherwise this will be None. + stderr (trio.abc.ReceiveStream or None): A stream connected to + the child's standard error stream: when the child writes to + standard error, the written bytes become available for you + to read here. Only available if the :class:`Process` was + constructed using ``stderr=PIPE``; otherwise this will be None. + + """ + + universal_newlines = False + encoding = None + errors = None + + # Available for the per-platform wait_child_exiting() implementations + # to stash some state; waitid platforms use this to avoid spawning + # arbitrarily many threads if wait() keeps getting cancelled. + _wait_for_exit_data = None + + def __init__( + self, args, *, stdin=None, stdout=None, stderr=None, **options + ): + for key in ( + 'universal_newlines', 'text', 'encoding', 'errors', 'bufsize' + ): + if options.get(key): + raise TypeError( + "trio.subprocess.Process only supports communicating over " + "unbuffered byte streams; the '{}' option is not supported" + .format(key) + ) + + self.stdin = None + self.stdout = None + self.stderr = None + + if stdin == subprocess.PIPE: + self.stdin, stdin = create_pipe_to_child_stdin() + if stdout == subprocess.PIPE: + self.stdout, stdout = create_pipe_from_child_output() + if stderr == subprocess.STDOUT: + # If we created a pipe for stdout, pass the same pipe for + # stderr. If stdout was some non-pipe thing (DEVNULL or a + # given FD), pass the same thing. If stdout was passed as + # None, keep stderr as STDOUT to allow subprocess to dup + # our stdout. Regardless of which of these is applicable, + # don't create a new trio stream for stderr -- if stdout + # is piped, stderr will be intermixed on the stdout stream. + if stdout is not None: + stderr = stdout + elif stderr == subprocess.PIPE: + self.stderr, stderr = create_pipe_from_child_output() + + try: + self._proc = subprocess.Popen( + args, stdin=stdin, stdout=stdout, stderr=stderr, **options + ) + finally: + # Close the parent's handle for each child side of a pipe; + # we want the child to have the only copy, so that when + # it exits we can read EOF on our side. + if self.stdin is not None: + os.close(stdin) + if self.stdout is not None: + os.close(stdout) + if self.stderr is not None: + os.close(stderr) + + self.args = self._proc.args + self.pid = self._proc.pid + + @property + def returncode(self): + """The exit status of the process (an integer), or ``None`` if it has + not exited. + + Negative values indicate termination due to a signal (on UNIX only). + Like :attr:`subprocess.Popen.returncode`, this is not updated outside + of a call to :meth:`wait` or :meth:`poll`. + """ + return self._proc.returncode + + async def aclose(self): + """Close any pipes we have to the process (both input and output) + and wait for it to exit. + + If cancelled, kills the process and waits for it to finish + exiting before propagating the cancellation. + """ + with _core.open_cancel_scope(shield=True): + if self.stdin is not None: + await self.stdin.aclose() + if self.stdout is not None: + await self.stdout.aclose() + if self.stderr is not None: + await self.stderr.aclose() + try: + await self.wait() + finally: + if self.returncode is None: + self.kill() + with _core.open_cancel_scope(shield=True): + await self.wait() + + async def wait(self): + """Block until the process exits. + + Returns: + The exit status of the process (a nonnegative integer, with + zero usually indicating success). On UNIX systems, a process + that exits due to a signal will have its exit status reported + as the negative of that signal number, e.g., -11 for ``SIGSEGV``. + """ + if self.poll() is None: + await wait_child_exiting(self) + self._proc.wait() + else: + await _core.checkpoint() + return self.returncode + + def poll(self): + """Forwards to :meth:`subprocess.Popen.poll`.""" + return self._proc.poll() + + def send_signal(self, sig): + """Forwards to :meth:`subprocess.Popen.send_signal`.""" + self._proc.send_signal(sig) + + def terminate(self): + """Forwards to :meth:`subprocess.Popen.terminate`.""" + self._proc.terminate() + + def kill(self): + """Forwards to :meth:`subprocess.Popen.kill`.""" + self._proc.kill() diff --git a/trio/_subprocess/__init__.py b/trio/_subprocess/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/trio/_subprocess/linux_waitpid.py b/trio/_subprocess/linux_waitpid.py deleted file mode 100644 index b9319865ec..0000000000 --- a/trio/_subprocess/linux_waitpid.py +++ /dev/null @@ -1,47 +0,0 @@ -import attr -import functools -import math -import os -import outcome -from typing import Any - -from .. import _core -from .._sync import CapacityLimiter, Event -from .._threads import run_sync_in_worker_thread - - -@attr.s -class WaitpidState: - pid = attr.ib() - event = attr.ib(default=attr.Factory(Event)) - outcome = attr.ib(default=None) - - -waitpid_limiter = CapacityLimiter(math.inf) - - -# adapted from -# https://github.com/python-trio/trio/issues/4#issuecomment-398967572 -async def _task(state: WaitpidState) -> None: - """The waitpid thread runner task. This must be spawned as a system - task.""" - partial = functools.partial( - os.waitpid, # function - state.pid, # pid - 0 # no options - ) - - tresult = await run_sync_in_worker_thread( - outcome.capture, partial, cancellable=True, limiter=waitpid_limiter - ) - state.outcome = tresult - state.event.set() - - -async def waitpid(pid: int) -> Any: - """Waits for a child process with the specified PID to finish running.""" - waiter = WaitpidState(pid=pid) - _core.spawn_system_task(_task, waiter) - - await waiter.event.wait() - return waiter.outcome.unwrap() diff --git a/trio/_subprocess_platform/__init__.py b/trio/_subprocess_platform/__init__.py new file mode 100644 index 0000000000..63ac754bcc --- /dev/null +++ b/trio/_subprocess_platform/__init__.py @@ -0,0 +1,108 @@ +# Platform-specific subprocess bits'n'pieces. + +import os +import sys +from typing import Tuple + +from .. import _core, _subprocess +from .._abc import SendStream, ReceiveStream + + +# Fallback versions of the functions provided -- implementations +# per OS are imported atop these at the bottom of the module. +async def wait_child_exiting(process: "_subprocess.Process") -> None: + """Block until the child process managed by ``process`` is exiting. + + It is invalid to call this function if the process has already + been waited on; that is, ``process.returncode`` must be None. + + When this function returns, it indicates that a call to + :meth:`subprocess.Popen.wait` will immediately be able to + return the process's exit status. The actual exit status is not + consumed by this call, since :class:`~subprocess.Popen` wants + to be able to do that itself. + """ + raise NotImplementedError from wait_child_exiting._error # pragma: no cover + + +async def create_pipe_to_child_stdin() -> Tuple[SendStream, int]: + """Create a new pipe suitable for sending data from this + process to the standard input of a child we're about to spawn. + + Returns: + A pair ``(trio_end, subprocess_end)`` where ``trio_end`` is a + :class:`~trio.abc.SendStream` and ``subprocess_end`` is + something suitable for passing as the ``stdin`` argument of + :class:`subprocess.Popen`. + """ + raise NotImplementedError from ( # pragma: no cover + create_pipe_to_child_stdin._error + ) + + +async def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]: + """Create a new pipe suitable for receiving data into this + process from the standard output or error stream of a child + we're about to spawn. + + Returns: + A pair ``(trio_end, subprocess_end)`` where ``trio_end`` is a + :class:`~trio.abc.ReceiveStream` and ``subprocess_end`` is + something suitable for passing as the ``stdin`` argument of + :class:`subprocess.Popen`. + """ + raise NotImplementedError from ( # pragma: no cover + create_pipe_to_child_stdin._error + ) + + +try: + if os.name == "nt": + from .windows import wait_child_exiting # noqa: F811 + elif hasattr(_core, "wait_kevent"): + from .kqueue import wait_child_exiting # noqa: F811 + else: + from .waitid import wait_child_exiting # noqa: F811 +except ImportError as ex: # pragma: no cover + wait_child_exiting._error = ex + +try: + if os.name == "posix": + from .._unix_pipes import PipeSendStream, PipeReceiveStream + + def create_pipe_to_child_stdin(): # noqa: F811 + rfd, wfd = os.pipe() + return PipeSendStream(wfd), rfd + + def create_pipe_from_child_output(): # noqa: F811 + rfd, wfd = os.pipe() + return PipeReceiveStream(rfd), wfd + + elif os.name == "nt": + from .._windows_pipes import PipeSendStream, PipeReceiveStream + + # This isn't exported or documented, but it's also not + # underscore-prefixed, and seems kosher to use. The asyncio docs + # for 3.5 included an example that imported socketpair from + # windows_utils (before socket.socketpair existed on Windows), and + # when asyncio.windows_utils.socketpair was removed in 3.7, the + # removal was mentioned in the release notes. + from asyncio.windows_utils import pipe as windows_pipe + import msvcrt + + def create_pipe_to_child_stdin(): # noqa: F811 + # for stdin, we want the write end (our end) to use overlapped I/O + rh, wh = windows_pipe(overlapped=(False, True)) + return PipeSendStream(wh), msvcrt.open_osfhandle(rh, os.O_RDONLY) + + def create_pipe_from_child_output(): # noqa: F811 + # for stdout/err, it's the read end that's overlapped + rh, wh = windows_pipe(overlapped=(True, False)) + return PipeReceiveStream(rh), msvcrt.open_osfhandle(wh, 0) + + else: # pragma: no cover + raise ImportError("pipes not implemented on this platform") + +except ImportError as ex: # pragma: no cover + create_pipe_to_child_stdin._error = ex + create_pipe_from_child_output._error = ex diff --git a/trio/_subprocess_platform/kqueue.py b/trio/_subprocess_platform/kqueue.py new file mode 100644 index 0000000000..be60e52f41 --- /dev/null +++ b/trio/_subprocess_platform/kqueue.py @@ -0,0 +1,41 @@ +import select +from .. import _core, _subprocess + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + kqueue = _core.current_kqueue() + try: + from select import KQ_NOTE_EXIT + except ImportError: # pragma: no cover + # pypy doesn't define KQ_NOTE_EXIT: + # https://bitbucket.org/pypy/pypy/issues/2921/ + # I verified this value against both Darwin and FreeBSD + KQ_NOTE_EXIT = 0x80000000 + + make_event = lambda flags: select.kevent( + process.pid, + filter=select.KQ_FILTER_PROC, + flags=flags, + fflags=KQ_NOTE_EXIT + ) + + try: + kqueue.control( + [make_event(select.KQ_EV_ADD | select.KQ_EV_ONESHOT)], 0 + ) + except ProcessLookupError: + # This can happen if the process has already exited. + # Frustratingly, it does _not_ synchronize with calls + # to wait() and friends -- it's possible for kevent to + # return ESRCH but waitpid(..., WNOHANG) still returns + # nothing. And OS X doesn't support waitid() so we + # can't fall back to the Linux-style approach. So + # we'll just suppress the error, and let the caller + # assume their wait won't block for long. + return + + def abort(_): + kqueue.control([make_event(select.KQ_EV_DELETE)], 0) + return _core.Abort.SUCCEEDED + + await _core.wait_kevent(process.pid, select.KQ_FILTER_PROC, abort) diff --git a/trio/_subprocess_platform/waitid.py b/trio/_subprocess_platform/waitid.py new file mode 100644 index 0000000000..9b2f86f752 --- /dev/null +++ b/trio/_subprocess_platform/waitid.py @@ -0,0 +1,108 @@ +import errno +import math +import os +import sys + +from .. import _core, _subprocess +from .._sync import CapacityLimiter, Event +from .._threads import run_sync_in_worker_thread + +try: + from os import waitid + + def sync_wait_reapable(pid): + waitid(os.P_PID, pid, os.WEXITED | os.WNOWAIT) + +except ImportError: + # pypy doesn't define os.waitid so we need to pull it out ourselves + # using cffi: https://bitbucket.org/pypy/pypy/issues/2922/ + import cffi + waitid_ffi = cffi.FFI() + + # Believe it or not, siginfo_t starts with fields in the + # same layout on both Linux and Darwin. The Linux structure + # is bigger so that's what we use to size `pad`; while + # there are a few extra fields in there, most of it is + # true padding which would not be written by the syscall. + waitid_ffi.cdef( + """ +typedef struct siginfo_s { + int si_signo; + int si_errno; + int si_code; + int si_pid; + int si_uid; + int si_status; + int pad[26]; +} siginfo_t; +int waitid(int idtype, int id, siginfo_t* result, int options); +""" + ) + waitid = waitid_ffi.dlopen(None).waitid + + def sync_wait_reapable(pid): + P_PID = 1 + WEXITED = 0x00000004 + if sys.platform == 'darwin': # pragma: no cover + # waitid() is not exposed on Python on Darwin but does + # work through CFFI; note that we typically won't get + # here since Darwin also defines kqueue + WNOWAIT = 0x00000020 + else: + WNOWAIT = 0x01000000 + result = waitid_ffi.new("siginfo_t *") + while waitid(P_PID, pid, result, WEXITED | WNOWAIT) < 0: + got_errno = waitid_ffi.errno + if got_errno == errno.EINTR: + continue + raise OSError(got_errno, os.strerror(got_errno)) + + +# adapted from +# https://github.com/python-trio/trio/issues/4#issuecomment-398967572 + +waitid_limiter = CapacityLimiter(math.inf) + + +async def _waitid_system_task(pid: int, event: Event) -> None: + """Spawn a thread that waits for ``pid`` to exit, then wake any tasks + that were waiting on it. + """ + # cancellable=True: if this task is cancelled, then we abandon the + # thread to keep running waitpid in the background. Since this is + # always run as a system task, this will only happen if the whole + # call to trio.run is shutting down. + + try: + await run_sync_in_worker_thread( + sync_wait_reapable, + pid, + cancellable=True, + limiter=waitid_limiter, + ) + except OSError: + # If waitid fails, waitpid will fail too, so it still makes + # sense to wake up the callers of wait_process_exiting(). The + # most likely reason for this error in practice is a child + # exiting when wait() is not possible because SIGCHLD is + # ignored. + pass + finally: + event.set() + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + # Logic of this function: + # - The first time we get called, we create an Event and start + # an instance of _waitid_system_task that will set the Event + # when waitid() completes. If that Event is set before + # we get cancelled, we're good. + # - Otherwise, a following call after the cancellation must + # reuse the Event created during the first call, lest we + # create an arbitrary number of threads waiting on the same + # process. + + if process._wait_for_exit_data is None: + process._wait_for_exit_data = event = Event() + _core.spawn_system_task(_waitid_system_task, process.pid, event) + await process._wait_for_exit_data.wait() diff --git a/trio/_subprocess_platform/windows.py b/trio/_subprocess_platform/windows.py new file mode 100644 index 0000000000..958be8675c --- /dev/null +++ b/trio/_subprocess_platform/windows.py @@ -0,0 +1,6 @@ +from .. import _subprocess +from .._wait_for_object import WaitForSingleObject + + +async def wait_child_exiting(process: "_subprocess.Process") -> None: + await WaitForSingleObject(int(process._proc._handle)) diff --git a/trio/_subprocess/unix_pipes.py b/trio/_unix_pipes.py similarity index 94% rename from trio/_subprocess/unix_pipes.py rename to trio/_unix_pipes.py index 7d7c94891e..629d66da3a 100644 --- a/trio/_subprocess/unix_pipes.py +++ b/trio/_unix_pipes.py @@ -2,8 +2,8 @@ import os from typing import Tuple -from .. import _core, BrokenResourceError -from .._abc import SendStream, ReceiveStream +from . import _core +from ._abc import SendStream, ReceiveStream __all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] @@ -65,7 +65,7 @@ async def send_all(self, data: bytes): total_sent += os.write(self._pipe, remaining) except BrokenPipeError as e: await _core.checkpoint() - raise BrokenResourceError from e + raise _core.BrokenResourceError from e except BlockingIOError: await self.wait_send_all_might_not_block() @@ -82,7 +82,7 @@ async def wait_send_all_might_not_block(self) -> None: # also doesn't checkpoint so we have to do that # ourselves here too await _core.checkpoint() - raise BrokenResourceError from e + raise _core.BrokenResourceError from e class PipeReceiveStream(_PipeMixin, ReceiveStream): diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py new file mode 100644 index 0000000000..2b57de7692 --- /dev/null +++ b/trio/_windows_pipes.py @@ -0,0 +1,115 @@ +import os +from typing import Tuple + +from . import _core +from ._abc import SendStream, ReceiveStream +from ._util import ConflictDetector +from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi + +__all__ = ["PipeSendStream", "PipeReceiveStream", "make_pipe"] + + +class _PipeMixin: + def __init__(self, pipe_handle: int) -> None: + self._pipe = _handle(pipe_handle) + _core.register_with_iocp(self._pipe) + self._closed = False + self._conflict_detector = ConflictDetector( + "another task is currently {}ing data on this pipe".format( + type(self).__name__[4:-5].lower() # "send" or "receive" + ) + ) + + def _close(self): + if self._closed: + return + + self._closed = True + if not kernel32.CloseHandle(self._pipe): + raise_winerror() + + async def aclose(self): + self._close() + await _core.checkpoint() + + def __del__(self): + self._close() + + +class PipeSendStream(_PipeMixin, SendStream): + """Represents a send stream over a Windows named pipe that has been + opened in OVERLAPPED mode. + """ + + async def send_all(self, data: bytes): + # adapted from the SocketStream code + async with self._conflict_detector: + if self._closed: + raise _core.ClosedResourceError("this pipe is already closed") + + if not data: + return + + # adapted from the SocketStream code + length = len(data) + with memoryview(data) as view: + total_sent = 0 + while total_sent < length: + # Only send 64K at a time. IOCP buffers can sometimes + # get pinned in kernel memory and we don't want to use + # an arbitrary amount. + with view[total_sent:total_sent + 65536] as chunk: + try: + total_sent += await _core.write_overlapped( + self._pipe, chunk + ) + except BrokenPipeError as ex: + raise _core.BrokenResourceError from ex + + async def wait_send_all_might_not_block(self) -> None: + async with self._conflict_detector: + if self._closed: + raise _core.ClosedResourceError("This pipe is already closed") + + # not implemented yet, and probably not needed + pass + + +class PipeReceiveStream(_PipeMixin, ReceiveStream): + """Represents a receive stream over an os.pipe object.""" + + async def receive_some(self, max_bytes: int) -> bytes: + async with self._conflict_detector: + if self._closed: + raise _core.ClosedResourceError("this pipe is already closed") + + if not isinstance(max_bytes, int): + raise TypeError("max_bytes must be integer >= 1") + + if max_bytes < 1: + raise ValueError("max_bytes must be integer >= 1") + + buffer = bytearray(max_bytes) + try: + size = await _core.readinto_overlapped(self._pipe, buffer) + except BrokenPipeError: + if self._closed: + raise _core.ClosedResourceError( + "another task closed this pipe" + ) from None + + # Windows raises BrokenPipeError on one end of a pipe + # whenever the other end closes, regardless of direction. + # Convert this to the Unix behavior of returning EOF to the + # reader when the writer closes. + return b"" + else: + del buffer[size:] + return buffer + + +async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]: + """Makes a new pair of pipes.""" + from asyncio.windows_utils import pipe + (r, w) = pipe() + return PipeSendStream(w), PipeReceiveStream(r) diff --git a/trio/hazmat.py b/trio/hazmat.py index eaab2ea3dd..2c19bd81ad 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -46,8 +46,12 @@ # Windows symbols try: from ._core import ( - current_iocp, register_with_iocp, wait_overlapped, - monitor_completion_key + current_iocp, + register_with_iocp, + wait_overlapped, + monitor_completion_key, + readinto_overlapped, + write_overlapped, ) except ImportError: pass diff --git a/trio/subprocess.py b/trio/subprocess.py new file mode 100644 index 0000000000..b91e28784a --- /dev/null +++ b/trio/subprocess.py @@ -0,0 +1,28 @@ +from ._subprocess import Process + +# Reexport constants and exceptions from the stdlib subprocess module +from subprocess import ( + PIPE, STDOUT, DEVNULL, CalledProcessError, SubprocessError, TimeoutExpired, + CompletedProcess +) + +# Windows only +try: + from subprocess import ( + STARTUPINFO, STD_INPUT_HANDLE, STD_OUTPUT_HANDLE, STD_ERROR_HANDLE, + SW_HIDE, STARTF_USESTDHANDLES, STARTF_USESHOWWINDOW, + CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP + ) +except ImportError: + pass + +# Windows 3.7+ only +try: + from subprocess import ( + ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, + HIGH_PRIORITY_CLASS, IDLE_PRIORITY_CLASS, NORMAL_PRIORITY_CLASS, + REALTIME_PRIORITY_CLASS, CREATE_NO_WINDOW, DETACHED_PROCESS, + CREATE_DEFAULT_ERROR_MODE, CREATE_BREAKAWAY_FROM_JOB + ) +except ImportError: + pass diff --git a/trio/tests/subprocess/__init__.py b/trio/tests/subprocess/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/trio/tests/subprocess/test_waitpid_linux.py b/trio/tests/subprocess/test_waitpid_linux.py deleted file mode 100644 index 53a37c61a1..0000000000 --- a/trio/tests/subprocess/test_waitpid_linux.py +++ /dev/null @@ -1,39 +0,0 @@ -import sys - -import os -import pytest -import signal - -from ... import _core -from ..._subprocess.linux_waitpid import waitpid - -pytestmark = pytest.mark.skipif( - sys.platform != "linux", reason="linux waitpid only works on linux" -) - - -async def test_waitpid(): - pid = os.spawnvp(os.P_NOWAIT, "/bin/false", ("false",)) - result = await waitpid(pid) - # exit code is a 16-bit int: (code, signal) - assert result[0] == pid - assert os.WIFEXITED(result[1]) and os.WEXITSTATUS(result[1]) == 1 - - pid2 = os.spawnvp(os.P_NOWAIT, "/bin/true", ("true",)) - result = await waitpid(pid2) - assert result[0] == pid2 - assert os.WIFEXITED(result[1]) and os.WEXITSTATUS(result[1]) == 0 - - pid3 = os.spawnvp(os.P_NOWAIT, "/bin/sleep", ("/bin/sleep", "5")) - os.kill(pid3, signal.SIGKILL) - result = await waitpid(pid3) - assert result[0] == pid3 - status = os.WTERMSIG(result[1]) - assert os.WIFSIGNALED(result[1]) and status == 9 - - -async def test_waitpid_no_process(): - with pytest.raises(ChildProcessError): - # this PID does exist, but it's ourselves - # which doesn't work - await waitpid(os.getpid()) diff --git a/trio/tests/test_subprocess.py b/trio/tests/test_subprocess.py new file mode 100644 index 0000000000..e0bd0857ed --- /dev/null +++ b/trio/tests/test_subprocess.py @@ -0,0 +1,303 @@ +import math +import os +import random +import signal +import sys +import pytest + +from .. import ( + _core, move_on_after, fail_after, sleep, sleep_forever, subprocess +) +from .._core.tests.tutil import slow +from ..testing import wait_all_tasks_blocked + +posix = os.name == "posix" +if posix: + from signal import SIGKILL, SIGTERM, SIGINT +else: + SIGKILL, SIGTERM, SIGINT = None, None, None + + +# Since Windows has very few command-line utilities generally available, +# all of our subprocesses are Python processes running short bits of +# (mostly) cross-platform code. +def python(code): + return [sys.executable, "-u", "-c", "import sys; " + code] + + +EXIT_TRUE = python("sys.exit(0)") +EXIT_FALSE = python("sys.exit(1)") +CAT = python("sys.stdout.buffer.write(sys.stdin.buffer.read())") +SLEEP = lambda seconds: python("import time; time.sleep({})".format(seconds)) + + +def got_signal(proc, sig): + if posix: + return proc.returncode == -sig + else: + return proc.returncode != 0 + + +async def test_basic(): + async with subprocess.Process(EXIT_TRUE) as proc: + assert proc.returncode is None + assert proc.returncode == 0 + + +async def test_kill_when_context_cancelled(): + with move_on_after(0) as scope: + async with subprocess.Process(SLEEP(10)) as proc: + assert proc.poll() is None + # Process context entry is synchronous, so this is the + # only checkpoint: + await sleep_forever() + assert scope.cancelled_caught + assert got_signal(proc, SIGKILL) + + +COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR = python( + "data = sys.stdin.buffer.read(); " + "sys.stdout.buffer.write(data); " + "sys.stderr.buffer.write(data[::-1])" +) + + +async def test_pipes(): + async with subprocess.Process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as proc: + msg = b"the quick brown fox jumps over the lazy dog" + + async def feed_input(): + await proc.stdin.send_all(msg) + await proc.stdin.aclose() + + async def check_output(stream, expected): + seen = bytearray() + while True: + chunk = await stream.receive_some(4096) + if not chunk: + break + seen.extend(chunk) + assert seen == expected + + async with _core.open_nursery() as nursery: + # fail quickly if something is broken + nursery.cancel_scope.deadline = _core.current_time() + 3.0 + nursery.start_soon(feed_input) + nursery.start_soon(check_output, proc.stdout, msg) + nursery.start_soon(check_output, proc.stderr, msg[::-1]) + + assert not nursery.cancel_scope.cancelled_caught + assert 0 == await proc.wait() + + +async def test_interactive(): + # Test some back-and-forth with a subprocess. This one works like so: + # in: 32\n + # out: 0000...0000\n (32 zeroes) + # err: 1111...1111\n (64 ones) + # in: 10\n + # out: 2222222222\n (10 twos) + # err: 3333....3333\n (20 threes) + # in: EOF + # out: EOF + # err: EOF + + async with subprocess.Process( + python( + "idx = 0\n" + "while True:\n" + " line = sys.stdin.readline()\n" + " if line == '': break\n" + " request = int(line.strip())\n" + " print(str(idx * 2) * request)\n" + " print(str(idx * 2 + 1) * request * 2, file=sys.stderr)\n" + " idx += 1\n" + ), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as proc: + + newline = b"\n" if posix else b"\r\n" + + async def expect(idx, request): + async with _core.open_nursery() as nursery: + + async def drain_one(stream, count, digit): + while count > 0: + result = await stream.receive_some(count) + assert result == ( + "{}".format(digit).encode("utf-8") * len(result) + ) + count -= len(result) + assert count == 0 + assert await stream.receive_some(len(newline)) == newline + + nursery.start_soon(drain_one, proc.stdout, request, idx * 2) + nursery.start_soon( + drain_one, proc.stderr, request * 2, idx * 2 + 1 + ) + + with fail_after(5): + await proc.stdin.send_all(b"12") + await sleep(0.1) + await proc.stdin.send_all(b"345" + newline) + await expect(0, 12345) + await proc.stdin.send_all(b"100" + newline + b"200" + newline) + await expect(1, 100) + await expect(2, 200) + await proc.stdin.send_all(b"0" + newline) + await expect(3, 0) + await proc.stdin.send_all(b"999999") + with move_on_after(0.1) as scope: + await expect(4, 0) + assert scope.cancelled_caught + await proc.stdin.send_all(newline) + await expect(4, 999999) + await proc.stdin.aclose() + assert await proc.stdout.receive_some(1) == b"" + assert await proc.stderr.receive_some(1) == b"" + assert proc.returncode == 0 + + +async def test_stderr_stdout(): + async with subprocess.Process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) as proc: + assert proc.stdout is not None + assert proc.stderr is None + await proc.stdin.send_all(b"1234") + await proc.stdin.aclose() + + output = [] + while True: + chunk = await proc.stdout.receive_some(16) + if chunk == b"": + break + output.append(chunk) + assert b"".join(output) == b"12344321" + assert proc.returncode == 0 + + # this one hits the branch where stderr=STDOUT but stdout + # is not redirected + async with subprocess.Process( + CAT, stdin=subprocess.PIPE, stderr=subprocess.STDOUT + ) as proc: + assert proc.stdout is None + assert proc.stderr is None + await proc.stdin.aclose() + assert proc.returncode == 0 + + if posix: + try: + r, w = os.pipe() + + async with subprocess.Process( + COPY_STDIN_TO_STDOUT_AND_BACKWARD_TO_STDERR, + stdin=subprocess.PIPE, + stdout=w, + stderr=subprocess.STDOUT, + ) as proc: + os.close(w) + assert proc.stdout is None + assert proc.stderr is None + await proc.stdin.send_all(b"1234") + await proc.stdin.aclose() + assert await proc.wait() == 0 + assert os.read(r, 4096) == b"12344321" + assert os.read(r, 4096) == b"" + finally: + os.close(r) + + +async def test_errors(): + with pytest.raises(TypeError) as excinfo: + subprocess.Process(["ls"], encoding="utf-8") + assert "unbuffered byte streams" in str(excinfo.value) + assert "the 'encoding' option is not supported" in str(excinfo.value) + + +async def test_signals(): + async def test_one_signal(send_it, signum): + with move_on_after(1.0) as scope: + async with subprocess.Process(SLEEP(3600)) as proc: + send_it(proc) + assert not scope.cancelled_caught + if posix: + assert proc.returncode == -signum + else: + assert proc.returncode != 0 + + await test_one_signal(subprocess.Process.kill, SIGKILL) + await test_one_signal(subprocess.Process.terminate, SIGTERM) + if posix: + await test_one_signal(lambda proc: proc.send_signal(SIGINT), SIGINT) + + +@pytest.mark.skipif(not posix, reason="POSIX specific") +async def test_wait_reapable_fails(): + old_sigchld = signal.signal(signal.SIGCHLD, signal.SIG_IGN) + try: + # With SIGCHLD disabled, the wait() syscall will wait for the + # process to exit but then fail with ECHILD. Make sure we + # support this case as the stdlib subprocess module does. + async with subprocess.Process(SLEEP(3600)) as proc: + async with _core.open_nursery() as nursery: + nursery.start_soon(proc.wait) + await wait_all_tasks_blocked() + proc.kill() + nursery.cancel_scope.deadline = _core.current_time() + 1.0 + assert not nursery.cancel_scope.cancelled_caught + assert proc.returncode == 0 # exit status unknowable, so... + finally: + signal.signal(signal.SIGCHLD, old_sigchld) + + +@slow +def test_waitid_eintr(): + # This only matters on PyPy (where we're coding EINTR handling + # ourselves) but the test works on all waitid platforms. + from .._subprocess_platform import wait_child_exiting + if not wait_child_exiting.__module__.endswith("waitid"): + pytest.skip("waitid only") + from .._subprocess_platform.waitid import sync_wait_reapable + import subprocess as stdlib_subprocess + + got_alarm = False + sleeper = stdlib_subprocess.Popen(["sleep", "3600"]) + + def on_alarm(sig, frame): + nonlocal got_alarm + got_alarm = True + sleeper.kill() + + old_sigalrm = signal.signal(signal.SIGALRM, on_alarm) + try: + signal.alarm(1) + sync_wait_reapable(sleeper.pid) + assert sleeper.wait(timeout=1) == -9 + finally: + if sleeper.returncode is None: # pragma: no cover + # We only get here if something fails in the above; + # if the test passes, wait() will reap the process + sleeper.kill() + sleeper.wait() + signal.signal(signal.SIGALRM, old_sigalrm) + + +def test_all_constants_reexported(): + trio_subprocess_exports = set(dir(subprocess)) + import subprocess as stdlib_subprocess + + for name in dir(stdlib_subprocess): + if name.isupper() and name[0] != "_": + stdlib_constant = name + assert stdlib_constant in trio_subprocess_exports diff --git a/trio/tests/subprocess/test_unix_pipes.py b/trio/tests/test_unix_pipes.py similarity index 85% rename from trio/tests/subprocess/test_unix_pipes.py rename to trio/tests/test_unix_pipes.py index b8cc2d496a..2e8815ca06 100644 --- a/trio/tests/subprocess/test_unix_pipes.py +++ b/trio/tests/test_unix_pipes.py @@ -4,20 +4,14 @@ import os import pytest -from trio._core.tests.tutil import gc_collect_harder -from ... import _core -from ...testing import (wait_all_tasks_blocked, check_one_way_stream) +from .._core.tests.tutil import gc_collect_harder +from .. import _core, move_on_after +from ..testing import wait_all_tasks_blocked, check_one_way_stream posix = os.name == "posix" - -pytestmark = pytest.mark.skipif( - not posix, reason="pipes are only supported on posix" -) - +pytestmark = pytest.mark.skipif(not posix, reason="posix only") if posix: - from ..._subprocess.unix_pipes import ( - PipeSendStream, PipeReceiveStream, make_pipe - ) + from .._unix_pipes import PipeSendStream, PipeReceiveStream, make_pipe async def test_send_pipe(): @@ -68,16 +62,6 @@ async def reader(): await write.aclose() -async def test_send_on_closed_pipe(): - write, read = await make_pipe() - await write.aclose() - - with pytest.raises(_core.ClosedResourceError): - await write.send_all(b"123") - - await read.aclose() - - async def test_pipe_errors(): with pytest.raises(TypeError): PipeReceiveStream(None) diff --git a/trio/tests/test_windows_pipes.py b/trio/tests/test_windows_pipes.py new file mode 100644 index 0000000000..bba3cfb00b --- /dev/null +++ b/trio/tests/test_windows_pipes.py @@ -0,0 +1,78 @@ +import errno +import select + +import os +import pytest + +from .._core.tests.tutil import gc_collect_harder +from .. import _core, move_on_after +from ..testing import wait_all_tasks_blocked, check_one_way_stream + +windows = os.name == "nt" +pytestmark = pytest.mark.skipif(not windows, reason="windows only") +if windows: + from .._windows_pipes import PipeSendStream, PipeReceiveStream, make_pipe + + +async def test_pipes_combined(): + write, read = await make_pipe() + count = 2**20 + + async def sender(): + big = bytearray(count) + await write.send_all(big) + + async def reader(): + await wait_all_tasks_blocked() + received = 0 + while received < count: + received += len(await read.receive_some(4096)) + + assert received == count + + async with _core.open_nursery() as n: + n.start_soon(sender) + n.start_soon(reader) + + await read.aclose() + await write.aclose() + + +async def test_pipe_errors(): + with pytest.raises(TypeError): + PipeReceiveStream(None) + + +async def test_async_with(): + w, r = await make_pipe() + async with w, r: + pass + + assert w._closed + assert r._closed + + # test failue-to-close + w._closed = False + with pytest.raises(OSError): + await w.aclose() + + +async def test_close_during_write(): + w, r = await make_pipe() + async with _core.open_nursery() as nursery: + + async def write_forever(): + with pytest.raises(_core.ClosedResourceError) as excinfo: + while True: + await w.send_all(b"x" * 4096) + assert "another task" in str(excinfo) + + nursery.start_soon(write_forever) + await wait_all_tasks_blocked(0.1) + await w.aclose() + + +async def test_pipe_fully(): + # passing make_clogged_pipe tests wait_send_all_might_not_block, and we + # can't implement that on Windows + await check_one_way_stream(make_pipe, None)