From b94d5a7ff99a6795617bc07ed7c94408c9fc5afd Mon Sep 17 00:00:00 2001 From: Donghyun Kim Date: Wed, 15 Nov 2023 01:48:31 +0900 Subject: [PATCH 01/12] Remove unneeded mutex and semaphore --- qasync/_windows.py | 88 +++++++++++++++++++--------------------------- 1 file changed, 36 insertions(+), 52 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index e11a027..4b49ae2 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -61,7 +61,6 @@ class _IocpProactor(windows_events.IocpProactor): def __init__(self): self.__events = [] super(_IocpProactor, self).__init__() - self._lock = QtCore.QMutex() def select(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -79,53 +78,42 @@ def close(self): # in the order they appear in the base class source code. def recv(self, conn, nbytes, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recv(conn, nbytes, flags) + return super(_IocpProactor, self).recv(conn, nbytes, flags) def recv_into(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recv_into(conn, buf, flags) + return super(_IocpProactor, self).recv_into(conn, buf, flags) def recvfrom(self, conn, nbytes, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) + return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) def recvfrom_into(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) + return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) def sendto(self, conn, buf, flags=0, addr=None): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).sendto(conn, buf, flags, addr) + return super(_IocpProactor, self).sendto(conn, buf, flags, addr) def send(self, conn, buf, flags=0): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).send(conn, buf, flags) + return super(_IocpProactor, self).send(conn, buf, flags) def accept(self, listener): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept(listener) + return super(_IocpProactor, self).accept(listener) def connect(self, conn, address): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).connect(conn, address) + return super(_IocpProactor, self).connect(conn, address) def sendfile(self, sock, file, offset, count): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).sendfile(sock, file, offset, count) + return super(_IocpProactor, self).sendfile(sock, file, offset, count) def accept_pipe(self, pipe): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept_pipe(pipe) + return super(_IocpProactor, self).accept_pipe(pipe) # connect_pipe() does not actually use the delayed completion machinery. # This takes care of wait_for_handle() too. def _wait_for_handle(self, handle, timeout, _is_cancel): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self)._wait_for_handle( - handle, timeout, _is_cancel - ) + return super(_IocpProactor, self)._wait_for_handle( + handle, timeout, _is_cancel + ) def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" @@ -140,30 +128,29 @@ def _poll(self, timeout=None): if ms >= UINT32_MAX: raise ValueError("timeout too big") - with QtCore.QMutexLocker(self._lock): - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break - ms = 0 - - err, transferred, key, address = status - try: - f, ov, obj, callback = self._cache.pop(address) - except KeyError: - # key is either zero, or it is used to return a pipe - # handle which should be closed to avoid a leak. - if key not in (0, _overlapped.INVALID_HANDLE_VALUE): - _winapi.CloseHandle(key) - continue - - if obj in self._stopped_serving: - f.cancel() - # Futures might already be resolved or cancelled - elif not f.done(): - self.__events.append((f, callback, transferred, key, ov)) + while True: + # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( + # ms, threading.get_ident())) + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + ms = 0 + + err, transferred, key, address = status + try: + f, ov, obj, callback = self._cache.pop(address) + except KeyError: + # key is either zero, or it is used to return a pipe + # handle which should be closed to avoid a leak. + if key not in (0, _overlapped.INVALID_HANDLE_VALUE): + _winapi.CloseHandle(key) + continue + + if obj in self._stopped_serving: + f.cancel() + # Futures might already be resolved or cancelled + elif not f.done(): + self.__events.append((f, callback, transferred, key, ov)) # Remove unregistered futures for ov in self._unregistered: @@ -179,11 +166,9 @@ def __init__(self, proactor, parent): self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events - self.__semaphore = QtCore.QSemaphore() def start(self): super().start() - self.__semaphore.acquire() def stop(self): self.__stop = True @@ -192,7 +177,6 @@ def stop(self): def run(self): self._logger.debug("Thread started") - self.__semaphore.release() while not self.__stop: events = self.__proactor.select(0.01) From e7a7b68539e7350dec265643d19f09b7fc08134a Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 02:42:37 +0900 Subject: [PATCH 02/12] Remove unneeded methods --- qasync/_windows.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index 4b49ae2..515feda 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -74,47 +74,12 @@ def close(self): self._logger.debug("Closing") super(_IocpProactor, self).close() - # Wrap all I/O submission methods to acquire the internal lock first; listed - # in the order they appear in the base class source code. - def recv(self, conn, nbytes, flags=0): return super(_IocpProactor, self).recv(conn, nbytes, flags) - def recv_into(self, conn, buf, flags=0): - return super(_IocpProactor, self).recv_into(conn, buf, flags) - - def recvfrom(self, conn, nbytes, flags=0): - return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) - - def recvfrom_into(self, conn, buf, flags=0): - return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) - - def sendto(self, conn, buf, flags=0, addr=None): - return super(_IocpProactor, self).sendto(conn, buf, flags, addr) - def send(self, conn, buf, flags=0): return super(_IocpProactor, self).send(conn, buf, flags) - def accept(self, listener): - return super(_IocpProactor, self).accept(listener) - - def connect(self, conn, address): - return super(_IocpProactor, self).connect(conn, address) - - def sendfile(self, sock, file, offset, count): - return super(_IocpProactor, self).sendfile(sock, file, offset, count) - - def accept_pipe(self, pipe): - return super(_IocpProactor, self).accept_pipe(pipe) - - # connect_pipe() does not actually use the delayed completion machinery. - - # This takes care of wait_for_handle() too. - def _wait_for_handle(self, handle, timeout, _is_cancel): - return super(_IocpProactor, self)._wait_for_handle( - handle, timeout, _is_cancel - ) - def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" if timeout is None: From 200cba52d8882a6bea7d2ede220eeb80dd6616b0 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 02:45:11 +0900 Subject: [PATCH 03/12] Remove more methods --- qasync/_windows.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index 515feda..ea42440 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -74,12 +74,6 @@ def close(self): self._logger.debug("Closing") super(_IocpProactor, self).close() - def recv(self, conn, nbytes, flags=0): - return super(_IocpProactor, self).recv(conn, nbytes, flags) - - def send(self, conn, buf, flags=0): - return super(_IocpProactor, self).send(conn, buf, flags) - def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" if timeout is None: From b0ff584420d6b44ceb11a8a215ea568ce4fd28dd Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 17:40:52 +0900 Subject: [PATCH 04/12] Remove Windows-specific code --- qasync/__init__.py | 10 +-- qasync/_windows.py | 164 --------------------------------------------- 2 files changed, 1 insertion(+), 173 deletions(-) delete mode 100644 qasync/_windows.py diff --git a/qasync/__init__.py b/qasync/__init__.py index d9dc8aa..ebc421d 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -740,15 +740,7 @@ def __log_error(cls, *args, **kwds): from ._unix import _SelectorEventLoop # noqa -QSelectorEventLoop = type("QSelectorEventLoop", (_QEventLoop, _SelectorEventLoop), {}) - -if os.name == "nt": - from ._windows import _ProactorEventLoop - - QIOCPEventLoop = type("QIOCPEventLoop", (_QEventLoop, _ProactorEventLoop), {}) - QEventLoop = QIOCPEventLoop -else: - QEventLoop = QSelectorEventLoop +QEventLoop = type("QSelectorEventLoop", (_QEventLoop, _SelectorEventLoop), {}) class _Cancellable: diff --git a/qasync/_windows.py b/qasync/_windows.py deleted file mode 100644 index ea42440..0000000 --- a/qasync/_windows.py +++ /dev/null @@ -1,164 +0,0 @@ -# © 2018 Gerard Marull-Paretas -# © 2014 Mark Harviston -# © 2014 Arve Knudsen -# BSD License - -"""Windows specific Quamash functionality.""" - -import asyncio -import sys - -try: - import _winapi - from asyncio import windows_events - import _overlapped -except ImportError: # noqa - pass # w/o guarding this import py.test can't gather doctests on platforms w/o _winapi - -import math - -from . import QtCore, _make_signaller -from ._common import with_logger - -UINT32_MAX = 0xFFFFFFFF - - -class _ProactorEventLoop(asyncio.ProactorEventLoop): - - """Proactor based event loop.""" - - def __init__(self): - super().__init__(_IocpProactor()) - - self.__event_signaller = _make_signaller(QtCore, list) - self.__event_signal = self.__event_signaller.signal - self.__event_signal.connect(self._process_events) - self.__event_poller = _EventPoller(self.__event_signal) - - def _process_events(self, events): - """Process events from proactor.""" - for f, callback, transferred, key, ov in events: - try: - self._logger.debug("Invoking event callback %s", callback) - value = callback(transferred, key, ov) - except OSError as e: - self._logger.debug("Event callback failed", exc_info=sys.exc_info()) - if not f.done(): - f.set_exception(e) - else: - if not f.cancelled(): - f.set_result(value) - - def _before_run_forever(self): - self.__event_poller.start(self._proactor) - - def _after_run_forever(self): - self.__event_poller.stop() - - -@with_logger -class _IocpProactor(windows_events.IocpProactor): - def __init__(self): - self.__events = [] - super(_IocpProactor, self).__init__() - - def select(self, timeout=None): - """Override in order to handle events in a threadsafe manner.""" - if not self.__events: - self._poll(timeout) - tmp = self.__events - self.__events = [] - return tmp - - def close(self): - self._logger.debug("Closing") - super(_IocpProactor, self).close() - - def _poll(self, timeout=None): - """Override in order to handle events in a threadsafe manner.""" - if timeout is None: - ms = UINT32_MAX # wait for eternity - elif timeout < 0: - raise ValueError("negative timeout") - else: - # GetQueuedCompletionStatus() has a resolution of 1 millisecond, - # round away from zero to wait *at least* timeout seconds. - ms = math.ceil(timeout * 1e3) - if ms >= UINT32_MAX: - raise ValueError("timeout too big") - - while True: - # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( - # ms, threading.get_ident())) - status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) - if status is None: - break - ms = 0 - - err, transferred, key, address = status - try: - f, ov, obj, callback = self._cache.pop(address) - except KeyError: - # key is either zero, or it is used to return a pipe - # handle which should be closed to avoid a leak. - if key not in (0, _overlapped.INVALID_HANDLE_VALUE): - _winapi.CloseHandle(key) - continue - - if obj in self._stopped_serving: - f.cancel() - # Futures might already be resolved or cancelled - elif not f.done(): - self.__events.append((f, callback, transferred, key, ov)) - - # Remove unregistered futures - for ov in self._unregistered: - self._cache.pop(ov.address, None) - self._unregistered.clear() - - -@with_logger -class _EventWorker(QtCore.QThread): - def __init__(self, proactor, parent): - super().__init__() - - self.__stop = False - self.__proactor = proactor - self.__sig_events = parent.sig_events - - def start(self): - super().start() - - def stop(self): - self.__stop = True - # Wait for thread to end - self.wait() - - def run(self): - self._logger.debug("Thread started") - - while not self.__stop: - events = self.__proactor.select(0.01) - if events: - self._logger.debug("Got events from poll: %s", events) - self.__sig_events.emit(events) - - self._logger.debug("Exiting thread") - - -@with_logger -class _EventPoller: - - """Polling of events in separate thread.""" - - def __init__(self, sig_events): - self.sig_events = sig_events - - def start(self, proactor): - self._logger.debug("Starting (proactor: %s)...", proactor) - self.__worker = _EventWorker(proactor, self) - self.__worker.start() - - def stop(self): - self._logger.debug("Stopping worker thread...") - self.__worker.stop() From f1118cf4986ee25e39f7621c46f04f5260dd0d8d Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 18:18:50 +0900 Subject: [PATCH 05/12] Revert "Remove Windows-specific code" This reverts commit b0ff584420d6b44ceb11a8a215ea568ce4fd28dd. --- qasync/__init__.py | 10 ++- qasync/_windows.py | 164 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 qasync/_windows.py diff --git a/qasync/__init__.py b/qasync/__init__.py index ebc421d..d9dc8aa 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -740,7 +740,15 @@ def __log_error(cls, *args, **kwds): from ._unix import _SelectorEventLoop # noqa -QEventLoop = type("QSelectorEventLoop", (_QEventLoop, _SelectorEventLoop), {}) +QSelectorEventLoop = type("QSelectorEventLoop", (_QEventLoop, _SelectorEventLoop), {}) + +if os.name == "nt": + from ._windows import _ProactorEventLoop + + QIOCPEventLoop = type("QIOCPEventLoop", (_QEventLoop, _ProactorEventLoop), {}) + QEventLoop = QIOCPEventLoop +else: + QEventLoop = QSelectorEventLoop class _Cancellable: diff --git a/qasync/_windows.py b/qasync/_windows.py new file mode 100644 index 0000000..ea42440 --- /dev/null +++ b/qasync/_windows.py @@ -0,0 +1,164 @@ +# © 2018 Gerard Marull-Paretas +# © 2014 Mark Harviston +# © 2014 Arve Knudsen +# BSD License + +"""Windows specific Quamash functionality.""" + +import asyncio +import sys + +try: + import _winapi + from asyncio import windows_events + import _overlapped +except ImportError: # noqa + pass # w/o guarding this import py.test can't gather doctests on platforms w/o _winapi + +import math + +from . import QtCore, _make_signaller +from ._common import with_logger + +UINT32_MAX = 0xFFFFFFFF + + +class _ProactorEventLoop(asyncio.ProactorEventLoop): + + """Proactor based event loop.""" + + def __init__(self): + super().__init__(_IocpProactor()) + + self.__event_signaller = _make_signaller(QtCore, list) + self.__event_signal = self.__event_signaller.signal + self.__event_signal.connect(self._process_events) + self.__event_poller = _EventPoller(self.__event_signal) + + def _process_events(self, events): + """Process events from proactor.""" + for f, callback, transferred, key, ov in events: + try: + self._logger.debug("Invoking event callback %s", callback) + value = callback(transferred, key, ov) + except OSError as e: + self._logger.debug("Event callback failed", exc_info=sys.exc_info()) + if not f.done(): + f.set_exception(e) + else: + if not f.cancelled(): + f.set_result(value) + + def _before_run_forever(self): + self.__event_poller.start(self._proactor) + + def _after_run_forever(self): + self.__event_poller.stop() + + +@with_logger +class _IocpProactor(windows_events.IocpProactor): + def __init__(self): + self.__events = [] + super(_IocpProactor, self).__init__() + + def select(self, timeout=None): + """Override in order to handle events in a threadsafe manner.""" + if not self.__events: + self._poll(timeout) + tmp = self.__events + self.__events = [] + return tmp + + def close(self): + self._logger.debug("Closing") + super(_IocpProactor, self).close() + + def _poll(self, timeout=None): + """Override in order to handle events in a threadsafe manner.""" + if timeout is None: + ms = UINT32_MAX # wait for eternity + elif timeout < 0: + raise ValueError("negative timeout") + else: + # GetQueuedCompletionStatus() has a resolution of 1 millisecond, + # round away from zero to wait *at least* timeout seconds. + ms = math.ceil(timeout * 1e3) + if ms >= UINT32_MAX: + raise ValueError("timeout too big") + + while True: + # self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format( + # ms, threading.get_ident())) + status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) + if status is None: + break + ms = 0 + + err, transferred, key, address = status + try: + f, ov, obj, callback = self._cache.pop(address) + except KeyError: + # key is either zero, or it is used to return a pipe + # handle which should be closed to avoid a leak. + if key not in (0, _overlapped.INVALID_HANDLE_VALUE): + _winapi.CloseHandle(key) + continue + + if obj in self._stopped_serving: + f.cancel() + # Futures might already be resolved or cancelled + elif not f.done(): + self.__events.append((f, callback, transferred, key, ov)) + + # Remove unregistered futures + for ov in self._unregistered: + self._cache.pop(ov.address, None) + self._unregistered.clear() + + +@with_logger +class _EventWorker(QtCore.QThread): + def __init__(self, proactor, parent): + super().__init__() + + self.__stop = False + self.__proactor = proactor + self.__sig_events = parent.sig_events + + def start(self): + super().start() + + def stop(self): + self.__stop = True + # Wait for thread to end + self.wait() + + def run(self): + self._logger.debug("Thread started") + + while not self.__stop: + events = self.__proactor.select(0.01) + if events: + self._logger.debug("Got events from poll: %s", events) + self.__sig_events.emit(events) + + self._logger.debug("Exiting thread") + + +@with_logger +class _EventPoller: + + """Polling of events in separate thread.""" + + def __init__(self, sig_events): + self.sig_events = sig_events + + def start(self, proactor): + self._logger.debug("Starting (proactor: %s)...", proactor) + self.__worker = _EventWorker(proactor, self) + self.__worker.start() + + def stop(self): + self._logger.debug("Stopping worker thread...") + self.__worker.stop() From abac70ae7f3120ea183aba37ea083bfb900fc9a2 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 19:14:39 +0900 Subject: [PATCH 06/12] Restore semaphore --- qasync/_windows.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/qasync/_windows.py b/qasync/_windows.py index ea42440..08c8c08 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -125,9 +125,11 @@ def __init__(self, proactor, parent): self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events + self.__semaphore = QtCore.QSemaphore() def start(self): super().start() + self.__semaphore.acquire() def stop(self): self.__stop = True @@ -136,6 +138,7 @@ def stop(self): def run(self): self._logger.debug("Thread started") + self.__semaphore.release() while not self.__stop: events = self.__proactor.select(0.01) From 60e3f35635517b15f5f46434ad07616b400fe22e Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 19:46:58 +0900 Subject: [PATCH 07/12] Revert "Restore semaphore" This reverts commit abac70ae7f3120ea183aba37ea083bfb900fc9a2. --- qasync/_windows.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index 08c8c08..ea42440 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -125,11 +125,9 @@ def __init__(self, proactor, parent): self.__stop = False self.__proactor = proactor self.__sig_events = parent.sig_events - self.__semaphore = QtCore.QSemaphore() def start(self): super().start() - self.__semaphore.acquire() def stop(self): self.__stop = True @@ -138,7 +136,6 @@ def stop(self): def run(self): self._logger.debug("Thread started") - self.__semaphore.release() while not self.__stop: events = self.__proactor.select(0.01) From f6f716f36a1e77860efd0259ebe95cd24f7a0d03 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Wed, 15 Nov 2023 19:57:33 +0900 Subject: [PATCH 08/12] Do not perform regression test on Windows --- tests/test_qeventloop.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index c1de714..88eb22d 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -553,6 +553,9 @@ def test_regression_bug13(loop, sock_pair): c_sock, s_sock = sock_pair client_done, server_done = asyncio.Future(), asyncio.Future() + if os.name == "nt": + return + async def server_coro(): s_reader, s_writer = await asyncio.open_connection(sock=s_sock) From a39a52d0c1fdd66bd610ab42a09491fd2ed6163c Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Thu, 16 Nov 2023 22:09:18 +0900 Subject: [PATCH 09/12] Use `exec()` over `exec_()` --- qasync/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qasync/__init__.py b/qasync/__init__.py index d9dc8aa..3363e97 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -373,10 +373,10 @@ def run_forever(self): self.__log_debug("Starting Qt event loop") asyncio.events._set_running_loop(self) rslt = -1 - if hasattr(self.__app, "exec_"): - rslt = self.__app.exec_() - else: + if hasattr(self.__app, "exec"): rslt = self.__app.exec() + else: + rslt = self.__app.exec_() self.__log_debug("Qt event loop ended with result %s", rslt) return rslt finally: From 3fec18efcbd1f7877481bbfe06a9a4fa646ab3e2 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Thu, 16 Nov 2023 23:07:13 +0900 Subject: [PATCH 10/12] Fix hanging --- qasync/_windows.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index ea42440..6150435 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -71,8 +71,36 @@ def select(self, timeout=None): return tmp def close(self): - self._logger.debug("Closing") - super(_IocpProactor, self).close() + if self._iocp is None: + # already closed + return + + # Cancel remaining registered operations. + for fut, ov, obj, callback in list(self._cache.values()): + if fut.cancelled(): + # Nothing to do with cancelled futures + pass + elif isinstance(fut, windows_events._WaitCancelFuture): + # _WaitCancelFuture must not be cancelled + pass + else: + try: + fut.cancel() + except OSError as exc: + if self._loop is not None: + context = { + "message": "Cancelling a future failed", + "exception": exc, + "future": fut, + } + if fut._source_traceback: + context["source_traceback"] = fut._source_traceback + self._loop.call_exception_handler(context) + + self._results = [] + + _winapi.CloseHandle(self._iocp) + self._iocp = None def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" From 800d1bbcf16ce36c26906039680a8b1af4ebe1c9 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Thu, 16 Nov 2023 23:15:16 +0900 Subject: [PATCH 11/12] Add comments --- tests/test_qeventloop.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index 88eb22d..7dcbd18 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -554,6 +554,11 @@ def test_regression_bug13(loop, sock_pair): client_done, server_done = asyncio.Future(), asyncio.Future() if os.name == "nt": + # On Windows, `loop.add_reader` and `loop.add_writer` + # are not supported by Python's `asyncio` due to platform limitations. + # Though `qasync` does provide those methods on Windows, + # it doesn't guarantee safety against race conditions like on Unix. + # https://docs.python.org/3/library/asyncio-platforms.html return async def server_coro(): From 30c76052ffbd49d8b5f583936b99fca653bbd7f8 Mon Sep 17 00:00:00 2001 From: DongHyun Kim Date: Fri, 17 Nov 2023 00:01:51 +0900 Subject: [PATCH 12/12] Register a pytest marker to avoid warning --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b47fc7e..312daca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,3 +51,6 @@ build-backend = "poetry.core.masonry.api" [tool.pytest] addopts = "-n auto" testpaths = ["tests"] + +[tool.pytest.ini_options] +markers = ["raises"]