From 7cfacc4795e064238dde646a0d72790b839c7fea Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 21 Nov 2025 11:30:11 -0800 Subject: [PATCH 01/17] import tornado selector thread v6.5.1 License: Apache-2.0 Copyright (c) 2025 The Tornado Authors unmodified from original, so we can track changes --- Lib/asyncio/_selector_thread.py | 302 ++++++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 Lib/asyncio/_selector_thread.py diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py new file mode 100644 index 00000000000000..ac0ed9c10ede50 --- /dev/null +++ b/Lib/asyncio/_selector_thread.py @@ -0,0 +1,302 @@ +""" +Compatibility for [add|remove]_[reader|writer] where unavailable (Proactor). + +Runs select in a background thread. + +Adapted from Tornado 6.5.1 + +:copyright: 2025, The Tornado Authors +:license: Apache-2.0 +""" + +import asyncio +import atexit +import contextvars +import errno +import functools +import select +import socket +import threading +import typing + +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Protocol, + Set, + Tuple, + TypeVar, + Union, +) + +if typing.TYPE_CHECKING: + from typing_extensions import TypeVarTuple, Unpack + + +class _HasFileno(Protocol): + def fileno(self) -> int: + pass + + +_FileDescriptorLike = Union[int, _HasFileno] + +_T = TypeVar("_T") + +if typing.TYPE_CHECKING: + _Ts = TypeVarTuple("_Ts") + +# Collection of selector thread event loops to shut down on exit. +_selector_loops: Set["SelectorThread"] = set() + + +def _atexit_callback() -> None: + for loop in _selector_loops: + with loop._select_cond: + loop._closing_selector = True + loop._select_cond.notify() + try: + loop._waker_w.send(b"a") + except BlockingIOError: + pass + if loop._thread is not None: + # If we don't join our (daemon) thread here, we may get a deadlock + # during interpreter shutdown. I don't really understand why. This + # deadlock happens every time in CI (both travis and appveyor) but + # I've never been able to reproduce locally. + loop._thread.join() + _selector_loops.clear() + + +atexit.register(_atexit_callback) + + +class SelectorThread: + """Define ``add_reader`` methods to be called in a background select thread. + + Instances of this class start a second thread to run a selector. + This thread is completely hidden from the user; + all callbacks are run on the wrapped event loop's thread. + + Typically used via ``AddThreadSelectorEventLoop``, + but can be attached to a running asyncio loop. + """ + + _closed = False + + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + self._main_thread_ctx = contextvars.copy_context() + + self._real_loop = real_loop + + self._select_cond = threading.Condition() + self._select_args: Optional[ + Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]] + ] = None + self._closing_selector = False + self._thread: Optional[threading.Thread] = None + self._thread_manager_handle = self._thread_manager() + + async def thread_manager_anext() -> None: + # the anext builtin wasn't added until 3.10. We just need to iterate + # this generator one step. + await self._thread_manager_handle.__anext__() + + # When the loop starts, start the thread. Not too soon because we can't + # clean up if we get to this point but the event loop is closed without + # starting. + self._real_loop.call_soon( + lambda: self._real_loop.create_task(thread_manager_anext()), + context=self._main_thread_ctx, + ) + + self._readers: Dict[_FileDescriptorLike, Callable] = {} + self._writers: Dict[_FileDescriptorLike, Callable] = {} + + # Writing to _waker_w will wake up the selector thread, which + # watches for _waker_r to be readable. + self._waker_r, self._waker_w = socket.socketpair() + self._waker_r.setblocking(False) + self._waker_w.setblocking(False) + _selector_loops.add(self) + self.add_reader(self._waker_r, self._consume_waker) + + def close(self) -> None: + if self._closed: + return + with self._select_cond: + self._closing_selector = True + self._select_cond.notify() + self._wake_selector() + if self._thread is not None: + self._thread.join() + _selector_loops.discard(self) + self.remove_reader(self._waker_r) + self._waker_r.close() + self._waker_w.close() + self._closed = True + + async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: + # Create a thread to run the select system call. We manage this thread + # manually so we can trigger a clean shutdown from an atexit hook. Note + # that due to the order of operations at shutdown, only daemon threads + # can be shut down in this way (non-daemon threads would require the + # introduction of a new hook: https://bugs.python.org/issue41962) + self._thread = threading.Thread( + name="Tornado selector", + daemon=True, + target=self._run_select, + ) + self._thread.start() + self._start_select() + try: + # The presense of this yield statement means that this coroutine + # is actually an asynchronous generator, which has a special + # shutdown protocol. We wait at this yield point until the + # event loop's shutdown_asyncgens method is called, at which point + # we will get a GeneratorExit exception and can shut down the + # selector thread. + yield + except GeneratorExit: + self.close() + raise + + def _wake_selector(self) -> None: + if self._closed: + return + try: + self._waker_w.send(b"a") + except BlockingIOError: + pass + + def _consume_waker(self) -> None: + try: + self._waker_r.recv(1024) + except BlockingIOError: + pass + + def _start_select(self) -> None: + # Capture reader and writer sets here in the event loop + # thread to avoid any problems with concurrent + # modification while the select loop uses them. + with self._select_cond: + assert self._select_args is None + self._select_args = (list(self._readers.keys()), list(self._writers.keys())) + self._select_cond.notify() + + def _run_select(self) -> None: + while True: + with self._select_cond: + while self._select_args is None and not self._closing_selector: + self._select_cond.wait() + if self._closing_selector: + return + assert self._select_args is not None + to_read, to_write = self._select_args + self._select_args = None + + # We use the simpler interface of the select module instead of + # the more stateful interface in the selectors module because + # this class is only intended for use on windows, where + # select.select is the only option. The selector interface + # does not have well-documented thread-safety semantics that + # we can rely on so ensuring proper synchronization would be + # tricky. + try: + # On windows, selecting on a socket for write will not + # return the socket when there is an error (but selecting + # for reads works). Also select for errors when selecting + # for writes, and merge the results. + # + # This pattern is also used in + # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 + rs, ws, xs = select.select(to_read, to_write, to_write) + ws = ws + xs + except OSError as e: + # After remove_reader or remove_writer is called, the file + # descriptor may subsequently be closed on the event loop + # thread. It's possible that this select thread hasn't + # gotten into the select system call by the time that + # happens in which case (at least on macOS), select may + # raise a "bad file descriptor" error. If we get that + # error, check and see if we're also being woken up by + # polling the waker alone. If we are, just return to the + # event loop and we'll get the updated set of file + # descriptors on the next iteration. Otherwise, raise the + # original error. + if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): + rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + if rs: + ws = [] + else: + raise + else: + raise + + try: + self._real_loop.call_soon_threadsafe( + self._handle_select, rs, ws, context=self._main_thread_ctx + ) + except RuntimeError: + # "Event loop is closed". Swallow the exception for + # consistency with PollIOLoop (and logical consistency + # with the fact that we can't guarantee that an + # add_callback that completes without error will + # eventually execute). + pass + except AttributeError: + # ProactorEventLoop may raise this instead of RuntimeError + # if call_soon_threadsafe races with a call to close(). + # Swallow it too for consistency. + pass + + def _handle_select( + self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike] + ) -> None: + for r in rs: + self._handle_event(r, self._readers) + for w in ws: + self._handle_event(w, self._writers) + self._start_select() + + def _handle_event( + self, + fd: _FileDescriptorLike, + cb_map: Dict[_FileDescriptorLike, Callable], + ) -> None: + try: + callback = cb_map[fd] + except KeyError: + return + callback() + + def add_reader( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + self._readers[fd] = functools.partial(callback, *args) + self._wake_selector() + + def add_writer( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + self._writers[fd] = functools.partial(callback, *args) + self._wake_selector() + + def remove_reader(self, fd: _FileDescriptorLike) -> bool: + try: + del self._readers[fd] + except KeyError: + return False + self._wake_selector() + return True + + def remove_writer(self, fd: _FileDescriptorLike) -> bool: + try: + del self._writers[fd] + except KeyError: + return False + self._wake_selector() + return True + From dabc7390c1921fe76f347b5b117e29e61f07c0f4 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 21 Nov 2025 14:01:22 -0800 Subject: [PATCH 02/17] remove some unused typing from selector thread --- Lib/asyncio/_selector_thread.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index ac0ed9c10ede50..7bc1881fe84e0f 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -9,6 +9,8 @@ :license: Apache-2.0 """ +from __future__ import annotations + import asyncio import atexit import contextvars @@ -32,9 +34,6 @@ Union, ) -if typing.TYPE_CHECKING: - from typing_extensions import TypeVarTuple, Unpack - class _HasFileno(Protocol): def fileno(self) -> int: @@ -45,9 +44,6 @@ def fileno(self) -> int: _T = TypeVar("_T") -if typing.TYPE_CHECKING: - _Ts = TypeVarTuple("_Ts") - # Collection of selector thread event loops to shut down on exit. _selector_loops: Set["SelectorThread"] = set() @@ -299,4 +295,3 @@ def remove_writer(self, fd: _FileDescriptorLike) -> bool: return False self._wake_selector() return True - From cc0e55a185058b4aa87b82b794d117465d9f228b Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 21 Nov 2025 14:45:28 -0800 Subject: [PATCH 03/17] Add ProactorEventLoop.add_reader and friends via background SelectorThread, imported from tornado --- Doc/library/asyncio-eventloop.rst | 3 ++ Doc/library/asyncio-platforms.rst | 10 ++-- Lib/asyncio/proactor_events.py | 28 ++++++++++ Lib/test/test_asyncio/test_windows_events.py | 52 +++++++++++++++++++ ...5-11-21-14-44-48.gh-issue-81554.hNFGMW.rst | 2 + 5 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 72f484fd1cbe77..0c11198e3a377b 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1051,6 +1051,9 @@ Watching file descriptors See also :ref:`Platform Support ` section for some limitations of these methods. +.. versionchanged:: 3.15 + + Added support for these methods to :class:`ProactorEventLoop`. Working with socket objects directly ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst index a2a3114ad6e4c5..23ef1e80bdc723 100644 --- a/Doc/library/asyncio-platforms.rst +++ b/Doc/library/asyncio-platforms.rst @@ -16,7 +16,7 @@ due to the platforms' underlying architecture and capabilities. All Platforms ============= -* :meth:`loop.add_reader` and :meth:`loop.add_writer` +* :meth:`~asyncio.loop.add_reader` and :meth:`~asyncio.loop.add_writer` cannot be used to monitor file I/O. @@ -59,8 +59,9 @@ All event loops on Windows do not support the following methods: :class:`ProactorEventLoop` has the following limitations: -* The :meth:`loop.add_reader` and :meth:`loop.add_writer` - methods are not supported. +* :meth:`loop.add_reader` and :meth:`loop.add_writer` only accept + socket handles (e.g. pipe file descriptors are not supported). + When called, :func:`select.select` is run in an additional thread. The resolution of the monotonic clock on Windows is usually around 15.6 milliseconds. The best resolution is 0.5 milliseconds. The resolution depends on the @@ -68,6 +69,9 @@ hardware (availability of `HPET `_) and on the Windows configuration. +.. versionadded:: 3.15 + + Support for :meth:`loop.add_reader`, :meth:`loop.add_writer` added to :class:`ProactorEventLoop`. .. _asyncio-windows-subprocess: diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f404273c3ae5c1..3c63f0241636ce 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -23,6 +23,7 @@ from . import transports from . import trsock from .log import logger +from ._selector_thread import SelectorThread as _SelectorThread def _set_socket_extra(transport, sock): @@ -633,6 +634,7 @@ def __init__(self, proactor): logger.debug('Using proactor: %s', proactor.__class__.__name__) self._proactor = proactor self._selector = proactor # convenient alias + self._selector_thread = None self._self_reading_future = None self._accept_futures = {} # socket file descriptor => Future proactor.set_loop(self) @@ -641,6 +643,17 @@ def __init__(self, proactor): # wakeup fd can only be installed to a file descriptor from the main thread signal.set_wakeup_fd(self._csock.fileno()) + def _get_selector_thread(self): + """Return the SelectorThread. + + creating it on first request, + so no thread is created until/unless + the first call to `add_reader` and friends. + """ + if self._selector_thread is None: + self._selector_thread = _SelectorThread(self) + return self._selector_thread + def _make_socket_transport(self, sock, protocol, waiter=None, extra=None, server=None): return _ProactorSocketTransport(self, sock, protocol, waiter, @@ -697,10 +710,25 @@ def close(self): self._proactor.close() self._proactor = None self._selector = None + if self._selector_thread is not None: + self._selector_thread.close() + self._selector_thread = None # Close the event loop super().close() + def add_reader(self, fd, callback, *args): + return self._get_selector_thread().add_reader(fd, callback, *args) + + def remove_reader(self, fd): + return self._get_selector_thread().remove_reader(fd) + + def add_writer(self, fd, callback, *args): + return self._get_selector_thread().add_writer(fd, callback, *args) + + def remove_writer(self, fd): + return self._get_selector_thread().remove_writer(fd) + async def sock_recv(self, sock, n): return await self._proactor.recv(sock, n) diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 0af3368627afca..eff1b7601e9d2e 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -323,6 +323,58 @@ def threadMain(): stop.set() thr.join() + def test_add_reader_invalid_argument(self): + def assert_raises(): + return self.assertRaisesRegex(ValueError, r'Invalid file object') + + def cb(sock): + return None + + with assert_raises(): + self.loop.add_reader(object(), cb) + with assert_raises(): + self.loop.add_writer(object(), cb) + + with assert_raises(): + self.loop.remove_reader(object()) + with assert_raises(): + self.loop.remove_writer(object()) + + def test_selector_thread(self): + assert self.loop._selector_thread is None + a, b = socket.socketpair() + async def _test(): + read_future = asyncio.Future() + sent = b"asdf" + + def write(): + a.sendall(sent) + self.loop.remove_writer(a) + + def read(): + msg = b.recv(100) + read_future.set_result(msg) + + self.loop.add_reader(b, read) + _selector_thread = self.loop._selector_thread + assert b in _selector_thread._readers + assert _selector_thread is not None + self.loop.add_writer(a, write) + assert self.loop._selector_thread is _selector_thread + assert a in _selector_thread._writers + msg = await asyncio.wait_for(read_future, timeout=10) + + self.loop.remove_writer(a) + assert a not in _selector_thread._writers + self.loop.remove_reader(b) + assert b not in _selector_thread._readers + a.close() + b.close() + assert self.loop._selector_thread is _selector_thread + assert msg == sent + + self.loop.run_until_complete(_test()) + class WinPolicyTests(WindowsEventsTestCase): diff --git a/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst b/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst new file mode 100644 index 00000000000000..efbd11b6357e53 --- /dev/null +++ b/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst @@ -0,0 +1,2 @@ +Added support for :meth:`~asyncio.loop.add_reader` to :class:`~asyncio.ProactorEventLoop` on Windows by +running :func:`select.select` in a background thread. From dacf0bb6cde0e8dc681869286db72fa543b491a4 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 21 Nov 2025 15:08:03 -0800 Subject: [PATCH 04/17] update spdx comments for tornado 6.5.2 --- Lib/asyncio/_selector_thread.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 7bc1881fe84e0f..1ea2d16ce8c71c 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -1,12 +1,13 @@ +# Contains code from https://github.com/tornadoweb/tornado/tree/v6.5.2 +# SPDX-License-Identifier: PSF-2.0 AND Apache-2.0 +# SPDX-FileCopyrightText: Copyright (c) 2025 The Tornado Authors + """ Compatibility for [add|remove]_[reader|writer] where unavailable (Proactor). Runs select in a background thread. -Adapted from Tornado 6.5.1 - -:copyright: 2025, The Tornado Authors -:license: Apache-2.0 +Adapted from Tornado 6.5.2 """ from __future__ import annotations From a720ba068ae6119ec14e74ea3d69088b5cba754f Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 08:27:18 -0800 Subject: [PATCH 05/17] Apply suggestions from code review Co-authored-by: Stan Ulbrych <89152624+StanFromIreland@users.noreply.github.com> --- Doc/library/asyncio-eventloop.rst | 1 + Doc/library/asyncio-platforms.rst | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 0c11198e3a377b..d260fb5004cdc3 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1055,6 +1055,7 @@ for some limitations of these methods. Added support for these methods to :class:`ProactorEventLoop`. + Working with socket objects directly ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst index 23ef1e80bdc723..322fa0f3fcda8e 100644 --- a/Doc/library/asyncio-platforms.rst +++ b/Doc/library/asyncio-platforms.rst @@ -60,7 +60,7 @@ All event loops on Windows do not support the following methods: :class:`ProactorEventLoop` has the following limitations: * :meth:`loop.add_reader` and :meth:`loop.add_writer` only accept - socket handles (e.g. pipe file descriptors are not supported). + socket handles (for example, pipe file descriptors are not supported). When called, :func:`select.select` is run in an additional thread. The resolution of the monotonic clock on Windows is usually around 15.6 @@ -73,6 +73,7 @@ Windows configuration. Support for :meth:`loop.add_reader`, :meth:`loop.add_writer` added to :class:`ProactorEventLoop`. + .. _asyncio-windows-subprocess: Subprocess Support on Windows From 5135dab3488a63fb3efc5cb0519922fdffe326d1 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 08:54:50 -0800 Subject: [PATCH 06/17] check and split fd-like to int FD in proactor add_reader matches tornado behavior tornado handles this in IOLoop, so we need to include it --- Lib/asyncio/_selector_thread.py | 31 ++++++++++++++++---- Lib/test/test_asyncio/test_windows_events.py | 8 ++--- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 1ea2d16ce8c71c..1f3cfcb540e74a 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -109,8 +109,8 @@ async def thread_manager_anext() -> None: context=self._main_thread_ctx, ) - self._readers: Dict[_FileDescriptorLike, Callable] = {} - self._writers: Dict[_FileDescriptorLike, Callable] = {} + self._readers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {} + self._writers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {} # Writing to _waker_w will wake up the selector thread, which # watches for _waker_r to be readable. @@ -261,27 +261,45 @@ def _handle_select( def _handle_event( self, fd: _FileDescriptorLike, - cb_map: Dict[_FileDescriptorLike, Callable], + cb_map: Dict[int, Tuple[_FileDescriptorLike, Callable]], ) -> None: try: - callback = cb_map[fd] + fileobj, callback = cb_map[fd] except KeyError: return callback() + def _split_fd(self, fd: _FileDescriptorLike) -> Tuple[int, _FileDescriptorLike]: + """Return fd, file object + + Keeps a handle on the fileobject given, + but always registers integer FD + """ + fileno = fd + if not isinstance(fileno, int): + try: + fileno = int(fileno.fileno()) + except (AttributeError, TypeError, ValueError): + # This code matches selectors._fileobj_to_fd function. + raise ValueError(f"Invalid file object: {fd!r}") from None + return fileno, fd + def add_reader( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: - self._readers[fd] = functools.partial(callback, *args) + fd, fileobj = self._split_fd(fd) + self._readers[fd] = (fileobj, functools.partial(callback, *args)) self._wake_selector() def add_writer( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: - self._writers[fd] = functools.partial(callback, *args) + fd, fileobj = self._split_fd(fd) + self._writers[fd] = (fileobj, functools.partial(callback, *args)) self._wake_selector() def remove_reader(self, fd: _FileDescriptorLike) -> bool: + fd, _ = self._split_fd(fd) try: del self._readers[fd] except KeyError: @@ -290,6 +308,7 @@ def remove_reader(self, fd: _FileDescriptorLike) -> bool: return True def remove_writer(self, fd: _FileDescriptorLike) -> bool: + fd, _ = self._split_fd(fd) try: del self._writers[fd] except KeyError: diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index eff1b7601e9d2e..49344b7a4585c4 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -357,17 +357,17 @@ def read(): self.loop.add_reader(b, read) _selector_thread = self.loop._selector_thread - assert b in _selector_thread._readers + assert b.fileno() in _selector_thread._readers assert _selector_thread is not None self.loop.add_writer(a, write) assert self.loop._selector_thread is _selector_thread - assert a in _selector_thread._writers + assert a.fileno() in _selector_thread._writers msg = await asyncio.wait_for(read_future, timeout=10) self.loop.remove_writer(a) - assert a not in _selector_thread._writers + assert a.fileno() not in _selector_thread._writers self.loop.remove_reader(b) - assert b not in _selector_thread._readers + assert b.fileno() not in _selector_thread._readers a.close() b.close() assert self.loop._selector_thread is _selector_thread From a5fa12555e36a9c57689bf8a8229a483ec3f6505 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 09:43:24 -0800 Subject: [PATCH 07/17] Address revieww - remove some older Python compatibility, since this is in the stdlib now - resolve race between selector thread call soon and EventLoop.close --- Lib/asyncio/_selector_thread.py | 62 ++++++++++----------------------- Lib/asyncio/proactor_events.py | 8 ++--- 2 files changed, 22 insertions(+), 48 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 1f3cfcb540e74a..1d3d96414e0076 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -10,8 +10,6 @@ Adapted from Tornado 6.5.2 """ -from __future__ import annotations - import asyncio import atexit import contextvars @@ -25,14 +23,7 @@ from typing import ( Any, Callable, - Dict, - List, - Optional, Protocol, - Set, - Tuple, - TypeVar, - Union, ) @@ -41,12 +32,10 @@ def fileno(self) -> int: pass -_FileDescriptorLike = Union[int, _HasFileno] - -_T = TypeVar("_T") +_FileDescriptorLike = int | _HasFileno # Collection of selector thread event loops to shut down on exit. -_selector_loops: Set["SelectorThread"] = set() +_selector_loops: set["SelectorThread"] = set() def _atexit_callback() -> None: @@ -89,28 +78,21 @@ def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: self._real_loop = real_loop self._select_cond = threading.Condition() - self._select_args: Optional[ - Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]] - ] = None + self._select_args: tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None = None self._closing_selector = False - self._thread: Optional[threading.Thread] = None + self._thread: threading.Thread | None = None self._thread_manager_handle = self._thread_manager() - async def thread_manager_anext() -> None: - # the anext builtin wasn't added until 3.10. We just need to iterate - # this generator one step. - await self._thread_manager_handle.__anext__() - # When the loop starts, start the thread. Not too soon because we can't # clean up if we get to this point but the event loop is closed without # starting. self._real_loop.call_soon( - lambda: self._real_loop.create_task(thread_manager_anext()), + lambda: self._real_loop.create_task(self._thread_manager_handle.__anext__()), context=self._main_thread_ctx, ) - self._readers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {} - self._writers: Dict[int, Tuple[_FileDescriptorLike, Callable]] = {} + self._readers: dict[int, tuple[_FileDescriptorLike, Callable]] = {} + self._writers: dict[int, tuple[_FileDescriptorLike, Callable]] = {} # Writing to _waker_w will wake up the selector thread, which # watches for _waker_r to be readable. @@ -142,7 +124,7 @@ async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: # can be shut down in this way (non-daemon threads would require the # introduction of a new hook: https://bugs.python.org/issue41962) self._thread = threading.Thread( - name="Tornado selector", + name="Asyncio selector", daemon=True, target=self._run_select, ) @@ -184,7 +166,7 @@ def _start_select(self) -> None: self._select_cond.notify() def _run_select(self) -> None: - while True: + while not self._closing_selector: with self._select_cond: while self._select_args is None and not self._closing_selector: self._select_cond.wait() @@ -232,25 +214,17 @@ def _run_select(self) -> None: else: raise - try: - self._real_loop.call_soon_threadsafe( + # if close has already started, don't schedule callbacks, + # which could cause a race + with self._select_cond: + if self._closing_selector: + return + self._real_loop.call_soon_threadsafe( self._handle_select, rs, ws, context=self._main_thread_ctx ) - except RuntimeError: - # "Event loop is closed". Swallow the exception for - # consistency with PollIOLoop (and logical consistency - # with the fact that we can't guarantee that an - # add_callback that completes without error will - # eventually execute). - pass - except AttributeError: - # ProactorEventLoop may raise this instead of RuntimeError - # if call_soon_threadsafe races with a call to close(). - # Swallow it too for consistency. - pass def _handle_select( - self, rs: List[_FileDescriptorLike], ws: List[_FileDescriptorLike] + self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] ) -> None: for r in rs: self._handle_event(r, self._readers) @@ -261,7 +235,7 @@ def _handle_select( def _handle_event( self, fd: _FileDescriptorLike, - cb_map: Dict[int, Tuple[_FileDescriptorLike, Callable]], + cb_map: dict[int, tuple[_FileDescriptorLike, Callable]], ) -> None: try: fileobj, callback = cb_map[fd] @@ -269,7 +243,7 @@ def _handle_event( return callback() - def _split_fd(self, fd: _FileDescriptorLike) -> Tuple[int, _FileDescriptorLike]: + def _split_fd(self, fd: _FileDescriptorLike) -> tuple[int, _FileDescriptorLike]: """Return fd, file object Keeps a handle on the fileobject given, diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 3c63f0241636ce..be786cd6a259d3 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -646,7 +646,7 @@ def __init__(self, proactor): def _get_selector_thread(self): """Return the SelectorThread. - creating it on first request, + Creates the thread it on first request, so no thread is created until/unless the first call to `add_reader` and friends. """ @@ -705,14 +705,14 @@ def close(self): # Call these methods before closing the event loop (before calling # BaseEventLoop.close), because they can schedule callbacks with # call_soon(), which is forbidden when the event loop is closed. + if self._selector_thread is not None: + self._selector_thread.close() + self._selector_thread = None self._stop_accept_futures() self._close_self_pipe() self._proactor.close() self._proactor = None self._selector = None - if self._selector_thread is not None: - self._selector_thread.close() - self._selector_thread = None # Close the event loop super().close() From 8c76be908413ece3cfa7896151e8a285db8184be Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 10:18:26 -0800 Subject: [PATCH 08/17] asyncio._selector_thread: avoid daemon thread rely on private threading._register_atexit to run cleanup prior to thread join --- Lib/asyncio/_selector_thread.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 1d3d96414e0076..4b968674287ea6 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -47,16 +47,13 @@ def _atexit_callback() -> None: loop._waker_w.send(b"a") except BlockingIOError: pass - if loop._thread is not None: - # If we don't join our (daemon) thread here, we may get a deadlock - # during interpreter shutdown. I don't really understand why. This - # deadlock happens every time in CI (both travis and appveyor) but - # I've never been able to reproduce locally. - loop._thread.join() _selector_loops.clear() -atexit.register(_atexit_callback) +# use internal _register_atexit to avoid need for daemon threads +# I can't find a public API for equivalent functionality +# to run something prior to thread join during process teardown +threading._register_atexit(_atexit_callback) class SelectorThread: @@ -120,18 +117,18 @@ def close(self) -> None: async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: # Create a thread to run the select system call. We manage this thread # manually so we can trigger a clean shutdown from an atexit hook. Note - # that due to the order of operations at shutdown, only daemon threads - # can be shut down in this way (non-daemon threads would require the - # introduction of a new hook: https://bugs.python.org/issue41962) + # that due to the order of operations at shutdown, + # we rely on private `threading._register_atexit` + # to wake the thread before joining to avoid hangs. + # See https://github.com/python/cpython/issues/86128 for more info self._thread = threading.Thread( - name="Asyncio selector", - daemon=True, + name="asyncio selector", target=self._run_select, ) self._thread.start() self._start_select() try: - # The presense of this yield statement means that this coroutine + # The presence of this yield statement means that this coroutine # is actually an asynchronous generator, which has a special # shutdown protocol. We wait at this yield point until the # event loop's shutdown_asyncgens method is called, at which point From 7996d6453f0ba0daaf572c7d838fcb861b8629de Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 10:29:02 -0800 Subject: [PATCH 09/17] add some docstrings to SelectorThread methods based on confusion in feedback --- Lib/asyncio/_selector_thread.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 4b968674287ea6..dc8830cc6087f7 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -6,6 +6,10 @@ Compatibility for [add|remove]_[reader|writer] where unavailable (Proactor). Runs select in a background thread. +_Only_ `select.select` is called in the background thread. + +Callbacks are all handled back in the event loop's thread, +as scheduled by `loop.call_soon_threadsafe`. Adapted from Tornado 6.5.2 """ @@ -61,10 +65,8 @@ class SelectorThread: Instances of this class start a second thread to run a selector. This thread is completely hidden from the user; - all callbacks are run on the wrapped event loop's thread. - - Typically used via ``AddThreadSelectorEventLoop``, - but can be attached to a running asyncio loop. + all callbacks are run on the wrapped event loop's thread + via :meth:`loop.call_soon_threadsafe`. """ _closed = False @@ -140,6 +142,7 @@ async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: raise def _wake_selector(self) -> None: + """Wake the selector thread from another thread.""" if self._closed: return try: @@ -148,12 +151,18 @@ def _wake_selector(self) -> None: pass def _consume_waker(self) -> None: + """Consume messages sent via _wake_selector.""" try: self._waker_r.recv(1024) except BlockingIOError: pass def _start_select(self) -> None: + """Start select waiting for events. + + Called from the event loop thread, + schedules select to be called in the background thread. + """ # Capture reader and writer sets here in the event loop # thread to avoid any problems with concurrent # modification while the select loop uses them. @@ -163,6 +172,12 @@ def _start_select(self) -> None: self._select_cond.notify() def _run_select(self) -> None: + """The main function of the select thread. + + Runs `select.select()` until `_closing_selector` attribute is set (typically by `close()`). + Schedules handling of `select.select` output on the main thread + via `loop.call_soon_threadsafe()`. + """ while not self._closing_selector: with self._select_cond: while self._select_args is None and not self._closing_selector: @@ -223,6 +238,10 @@ def _run_select(self) -> None: def _handle_select( self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] ) -> None: + """Handle the result of select.select. + + This method is called on the event loop thread via `call_soon_threadsafe`. + """ for r in rs: self._handle_event(r, self._readers) for w in ws: @@ -234,6 +253,11 @@ def _handle_event( fd: _FileDescriptorLike, cb_map: dict[int, tuple[_FileDescriptorLike, Callable]], ) -> None: + """Handle one callback event. + + This method is called on the event loop thread via `call_soon_threadsafe` (from `_handle_select`), + so exception handler wrappers, etc. are applied. + """ try: fileobj, callback = cb_map[fd] except KeyError: From c12d733251ac5a03f0d41dc63494dccf288c1093 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 24 Nov 2025 10:48:14 -0800 Subject: [PATCH 10/17] use versionchanged:: next according to devguide --- Doc/library/asyncio-eventloop.rst | 2 +- Doc/library/asyncio-platforms.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index d260fb5004cdc3..7a9c4f59fe306a 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1051,7 +1051,7 @@ Watching file descriptors See also :ref:`Platform Support ` section for some limitations of these methods. -.. versionchanged:: 3.15 +.. versionchanged:: next Added support for these methods to :class:`ProactorEventLoop`. diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst index 322fa0f3fcda8e..497bd96cfdd21c 100644 --- a/Doc/library/asyncio-platforms.rst +++ b/Doc/library/asyncio-platforms.rst @@ -69,7 +69,7 @@ hardware (availability of `HPET `_) and on the Windows configuration. -.. versionadded:: 3.15 +.. versionadded:: next Support for :meth:`loop.add_reader`, :meth:`loop.add_writer` added to :class:`ProactorEventLoop`. From 502bfafdae204359cc3cb5db5e988d4d759f7a68 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 28 Nov 2025 17:26:49 -0800 Subject: [PATCH 11/17] use Handles to ensure selector thread don't raise and halt handling --- Lib/asyncio/_selector_thread.py | 23 +++- Lib/test/test_asyncio/test_selector_thread.py | 107 ++++++++++++++++++ 2 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 Lib/test/test_asyncio/test_selector_thread.py diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index dc8830cc6087f7..270f094f1af7e9 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -30,6 +30,8 @@ Protocol, ) +from . import events + class _HasFileno(Protocol): def fileno(self) -> int: @@ -259,10 +261,11 @@ def _handle_event( so exception handler wrappers, etc. are applied. """ try: - fileobj, callback = cb_map[fd] + fileobj, handle = cb_map[fd] except KeyError: return - callback() + if not handle.cancelled(): + handle._run() def _split_fd(self, fd: _FileDescriptorLike) -> tuple[int, _FileDescriptorLike]: """Return fd, file object @@ -283,30 +286,38 @@ def add_reader( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: fd, fileobj = self._split_fd(fd) - self._readers[fd] = (fileobj, functools.partial(callback, *args)) + if fd in self._readers: + _, handle = self._readers[fd] + handle.cancel() + self._readers[fd] = (fileobj, events.Handle(callback, args, self._real_loop)) self._wake_selector() def add_writer( self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any ) -> None: fd, fileobj = self._split_fd(fd) - self._writers[fd] = (fileobj, functools.partial(callback, *args)) + if fd in self._writers: + _, handle = self._writers[fd] + handle.cancel() + self._writers[fd] = (fileobj, events.Handle(callback, args, self._real_loop)) self._wake_selector() def remove_reader(self, fd: _FileDescriptorLike) -> bool: fd, _ = self._split_fd(fd) try: - del self._readers[fd] + _, handle = self._readers.pop(fd) except KeyError: return False + handle.cancel() self._wake_selector() return True def remove_writer(self, fd: _FileDescriptorLike) -> bool: fd, _ = self._split_fd(fd) try: - del self._writers[fd] + _, handle = self._writers.pop(fd) except KeyError: return False + handle.cancel() self._wake_selector() return True diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py new file mode 100644 index 00000000000000..71668535fce7bf --- /dev/null +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -0,0 +1,107 @@ +import asyncio +import select +import socket +import time +import unittest +from asyncio._selector_thread import SelectorThread +from unittest import mock + + +class SelectorThreadTest((unittest.IsolatedAsyncioTestCase)): + async def asyncSetUp(self): + self._sockets = [] + self.selector_thread = SelectorThread(asyncio.get_running_loop()) + + def socketpair(self): + pair = socket.socketpair() + self._sockets.extend(pair) + return pair + + async def asyncTearDown(self): + self.selector_thread.close() + for s in self._sockets: + s.close() + + async def test_slow_reader(self): + a, b = self.socketpair() + def recv(): + b.recv(100) + mock_recv = mock.MagicMock(wraps=recv) + # make sure select is only called once when + # event loop thread is slow to consume events + a.sendall(b"msg") + with mock.patch("select.select", wraps=select.select) as mock_select: + self.selector_thread.add_reader(b, mock_recv) + time.sleep(0.1) + self.assertEqual(mock_select.call_count, 1) + await asyncio.sleep(0.1) + self.assertEqual(mock_recv.call_count, 1) + + async def test_reader_error(self): + # test error handling in callbacks doesn't break handling + a, b = self.socketpair() + a.sendall(b"to_b") + + selector_thread = self.selector_thread + + # make sure it's called a few + n_failures = 5 + counter = 0 + bad_recv_done = asyncio.Future() + def bad_recv(sock): + # fail the first n_failures calls, then succeed + nonlocal counter + counter += 1 + if counter > n_failures: + bad_recv_done.set_result(None) + sock.recv(10) + return + raise Exception("Testing reader error") + + recv_callback = mock.MagicMock(wraps=bad_recv) + + exception_handler = mock.MagicMock() + asyncio.get_running_loop().set_exception_handler(exception_handler) + + selector_thread.add_reader(b, recv_callback, b) + + # make sure start_select is called + # even when recv callback errors, + with mock.patch.object(selector_thread, "_start_select", wraps=selector_thread._start_select) as start_select: + await asyncio.wait_for(bad_recv_done, timeout=10) + + # make sure recv is called N + 1 times, + # exception N times, + # start_select at least that many + self.assertEqual(recv_callback.call_count, n_failures + 1) + self.assertEqual(exception_handler.call_count, n_failures) + self.assertGreaterEqual(start_select.call_count, n_failures) + + async def test_read_write(self): + a, b = self.socketpair() + read_future = asyncio.Future() + sent = b"asdf" + loop = asyncio.get_running_loop() + selector_thread = self.selector_thread + + def write(): + a.sendall(sent) + loop.remove_writer(a) + + def read(): + msg = b.recv(100) + read_future.set_result(msg) + + selector_thread.add_reader(b, read) + assert b.fileno() in selector_thread._readers + selector_thread.add_writer(a, write) + assert a.fileno() in selector_thread._writers + msg = await asyncio.wait_for(read_future, timeout=10) + + selector_thread.remove_writer(a) + assert a.fileno() not in selector_thread._writers + selector_thread.remove_reader(b) + assert b.fileno() not in selector_thread._readers + a.close() + b.close() + assert msg == sent From eb8739e676d8c4eabdeace998c6f58c793f76b98 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 1 Dec 2025 09:29:33 -0800 Subject: [PATCH 12/17] more robust selector thread test for thread asynciness --- Lib/test/test_asyncio/test_selector_thread.py | 26 +++++++++++++++---- Lib/test/test_asyncio/test_windows_events.py | 3 ++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py index 71668535fce7bf..194ab2f83c4075 100644 --- a/Lib/test/test_asyncio/test_selector_thread.py +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -24,18 +24,30 @@ async def asyncTearDown(self): async def test_slow_reader(self): a, b = self.socketpair() + first_recv = asyncio.Future() + def recv(): - b.recv(100) + msg = b.recv(100) + if not first_recv.done(): + first_recv.set_result(msg) + mock_recv = mock.MagicMock(wraps=recv) # make sure select is only called once when # event loop thread is slow to consume events a.sendall(b"msg") with mock.patch("select.select", wraps=select.select) as mock_select: self.selector_thread.add_reader(b, mock_recv) + # ready event, but main event loop is blocked for some time time.sleep(0.1) - self.assertEqual(mock_select.call_count, 1) - await asyncio.sleep(0.1) + recvd = await asyncio.wait_for(first_recv, timeout=10) + self.assertEqual(recvd, b"msg") + # make sure recv wasn't scheduled more than once self.assertEqual(mock_recv.call_count, 1) + # 1 for add_reader + # 1 for finishing reader callback + # up to 2 more for wake FD calls if CI is slow + # this would be thousands if select is busy-looping while the main thread blocks + self.assertLessEqual(mock_select.call_count, 5) async def test_reader_error(self): # test error handling in callbacks doesn't break handling @@ -44,10 +56,12 @@ async def test_reader_error(self): selector_thread = self.selector_thread - # make sure it's called a few + # make sure it's called a few times, + # and errors don't prevent rescheduling n_failures = 5 counter = 0 bad_recv_done = asyncio.Future() + def bad_recv(sock): # fail the first n_failures calls, then succeed nonlocal counter @@ -67,7 +81,9 @@ def bad_recv(sock): # make sure start_select is called # even when recv callback errors, - with mock.patch.object(selector_thread, "_start_select", wraps=selector_thread._start_select) as start_select: + with mock.patch.object( + selector_thread, "_start_select", wraps=selector_thread._start_select + ) as start_select: await asyncio.wait_for(bad_recv_done, timeout=10) # make sure recv is called N + 1 times, diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 49344b7a4585c4..e673f96563272b 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -325,7 +325,7 @@ def threadMain(): def test_add_reader_invalid_argument(self): def assert_raises(): - return self.assertRaisesRegex(ValueError, r'Invalid file object') + return self.assertRaisesRegex(ValueError, r"Invalid file object") def cb(sock): return None @@ -343,6 +343,7 @@ def cb(sock): def test_selector_thread(self): assert self.loop._selector_thread is None a, b = socket.socketpair() + async def _test(): read_future = asyncio.Future() sent = b"asdf" From 0b017d00c3c63639337a243b6919087b9eb10082 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 1 Dec 2025 20:12:05 -0800 Subject: [PATCH 13/17] use assert methods --- Lib/test/test_asyncio/test_selector_thread.py | 10 +++++----- Lib/test/test_asyncio/test_windows_events.py | 18 +++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py index 194ab2f83c4075..3cb5475c06bde9 100644 --- a/Lib/test/test_asyncio/test_selector_thread.py +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -109,15 +109,15 @@ def read(): read_future.set_result(msg) selector_thread.add_reader(b, read) - assert b.fileno() in selector_thread._readers + self.assertIn(b.fileno(), selector_thread._readers) selector_thread.add_writer(a, write) - assert a.fileno() in selector_thread._writers + self.assertIn(a.fileno(), selector_thread._writers) msg = await asyncio.wait_for(read_future, timeout=10) selector_thread.remove_writer(a) - assert a.fileno() not in selector_thread._writers + self.assertNotIn(a.fileno() , selector_thread._writers) selector_thread.remove_reader(b) - assert b.fileno() not in selector_thread._readers + self.assertNotIn(b.fileno() , selector_thread._readers) a.close() b.close() - assert msg == sent + self.assertEqual(msg, sent) diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index e673f96563272b..551da75c680666 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -341,7 +341,7 @@ def cb(sock): self.loop.remove_writer(object()) def test_selector_thread(self): - assert self.loop._selector_thread is None + self.assertIsNone(self.loop._selector_thread) a, b = socket.socketpair() async def _test(): @@ -358,21 +358,21 @@ def read(): self.loop.add_reader(b, read) _selector_thread = self.loop._selector_thread - assert b.fileno() in _selector_thread._readers - assert _selector_thread is not None + self.assertIn(b.fileno(), _selector_thread._readers) + self.assertIsNotNone(_selector_thread) self.loop.add_writer(a, write) - assert self.loop._selector_thread is _selector_thread - assert a.fileno() in _selector_thread._writers + self.assertIs(self.loop._selector_thread, _selector_thread) + self.assertIn(a.fileno(), _selector_thread._writers) msg = await asyncio.wait_for(read_future, timeout=10) self.loop.remove_writer(a) - assert a.fileno() not in _selector_thread._writers + self.assertNotIn(a.fileno(), _selector_thread._writers) self.loop.remove_reader(b) - assert b.fileno() not in _selector_thread._readers + self.assertNotIn(b.fileno(), _selector_thread._readers) a.close() b.close() - assert self.loop._selector_thread is _selector_thread - assert msg == sent + self.assertIs(self.loop._selector_thread, _selector_thread) + self.assertEqual(msg, sent) self.loop.run_until_complete(_test()) From 77c304ff9d83778c4220298fb8092fdbc9c06638 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 5 Dec 2025 09:32:11 -0800 Subject: [PATCH 14/17] Apply suggestions from code review Co-authored-by: Victor Stinner --- Lib/test/test_asyncio/test_selector_thread.py | 6 +++--- Lib/test/test_asyncio/test_windows_events.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py index 3cb5475c06bde9..3ec3c81a45a879 100644 --- a/Lib/test/test_asyncio/test_selector_thread.py +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -7,7 +7,7 @@ from unittest import mock -class SelectorThreadTest((unittest.IsolatedAsyncioTestCase)): +class SelectorThreadTest(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self._sockets = [] self.selector_thread = SelectorThread(asyncio.get_running_loop()) @@ -39,7 +39,7 @@ def recv(): self.selector_thread.add_reader(b, mock_recv) # ready event, but main event loop is blocked for some time time.sleep(0.1) - recvd = await asyncio.wait_for(first_recv, timeout=10) + recvd = await asyncio.wait_for(first_recv, timeout=support.SHORT_TIMEOUT) self.assertEqual(recvd, b"msg") # make sure recv wasn't scheduled more than once self.assertEqual(mock_recv.call_count, 1) @@ -84,7 +84,7 @@ def bad_recv(sock): with mock.patch.object( selector_thread, "_start_select", wraps=selector_thread._start_select ) as start_select: - await asyncio.wait_for(bad_recv_done, timeout=10) + await asyncio.wait_for(bad_recv_done, timeout=support.SHORT_TIMEOUT) # make sure recv is called N + 1 times, # exception N times, diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 551da75c680666..370c7579697ba1 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -363,7 +363,7 @@ def read(): self.loop.add_writer(a, write) self.assertIs(self.loop._selector_thread, _selector_thread) self.assertIn(a.fileno(), _selector_thread._writers) - msg = await asyncio.wait_for(read_future, timeout=10) + msg = await asyncio.wait_for(read_future, timeout=support.SHORT_TIMEOUT) self.loop.remove_writer(a) self.assertNotIn(a.fileno(), _selector_thread._writers) From f75f7c57d51efdd7c210d3185893000045333c7a Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 5 Dec 2025 09:35:00 -0800 Subject: [PATCH 15/17] from test import support --- Lib/test/test_asyncio/test_selector_thread.py | 5 +++-- Lib/test/test_asyncio/test_windows_events.py | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py index 3ec3c81a45a879..03300e67f8578a 100644 --- a/Lib/test/test_asyncio/test_selector_thread.py +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -4,6 +4,7 @@ import time import unittest from asyncio._selector_thread import SelectorThread +from test import support from unittest import mock @@ -115,9 +116,9 @@ def read(): msg = await asyncio.wait_for(read_future, timeout=10) selector_thread.remove_writer(a) - self.assertNotIn(a.fileno() , selector_thread._writers) + self.assertNotIn(a.fileno(), selector_thread._writers) selector_thread.remove_reader(b) - self.assertNotIn(b.fileno() , selector_thread._readers) + self.assertNotIn(b.fileno(), selector_thread._readers) a.close() b.close() self.assertEqual(msg, sent) diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 370c7579697ba1..fa3a33f4185ee7 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -5,6 +5,7 @@ import time import threading import unittest +from test import support from unittest import mock if sys.platform != 'win32': From 75c328b5fa3a0af6c52a58c133b4e48832597709 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 5 Dec 2025 09:35:53 -0800 Subject: [PATCH 16/17] apply suggestions from review Co-authored-by: Victor Stinner --- Lib/asyncio/_selector_thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py index 270f094f1af7e9..edade54898cf89 100644 --- a/Lib/asyncio/_selector_thread.py +++ b/Lib/asyncio/_selector_thread.py @@ -271,7 +271,7 @@ def _split_fd(self, fd: _FileDescriptorLike) -> tuple[int, _FileDescriptorLike]: """Return fd, file object Keeps a handle on the fileobject given, - but always registers integer FD + but always registers integer FD. """ fileno = fd if not isinstance(fileno, int): From 567f751b432a539f3b4008ea63ec0cc35007e2bb Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 5 Dec 2025 10:08:11 -0800 Subject: [PATCH 17/17] copy event loop policy teardown from other asyncio tests --- Lib/test/test_asyncio/test_selector_thread.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py index 03300e67f8578a..7721e216436e0d 100644 --- a/Lib/test/test_asyncio/test_selector_thread.py +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -8,6 +8,10 @@ from unittest import mock +def tearDownModule(): + asyncio.events._set_event_loop_policy(None) + + class SelectorThreadTest(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self._sockets = []