From 5f6580ee3b865dd6a69f52a081c988ea65f4ff4e Mon Sep 17 00:00:00 2001 From: David Nadlinger Date: Sun, 1 Oct 2023 20:54:31 +0100 Subject: [PATCH] _IocpProactor: Acquire lock in previously missed methods For the same reason that recv(), send(), accept() and connect() need to acquire the lock, all the other operations need to as well. This also shows a design limitation in this approach w.r.t. maintainability (silent failure when new operations are added); it would be preferable to either reproduce the windows_events implementation in its entirety, or use proxying via composition to ensure methods not made thread-safe cannot be invoked. Finally, the _poll implementation seems to have further issues: it is not clear to me that the _stopped_serving handling is thread-safe, _unregistered is not handled, and it seems preferable not to hold the lock when calling GetQueuedCompletionStatus. --- qasync/_windows.py | 54 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/qasync/_windows.py b/qasync/_windows.py index ec13ede..602044d 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -75,14 +75,56 @@ def close(self): self._logger.debug('Closing') super(_IocpProactor, self).close() + # Wrap all I/O submission methods to acquire the internal lock first; listed + # in the order they appear in the base class source code. + def recv(self, conn, nbytes, flags=0): with QtCore.QMutexLocker(self._lock): return super(_IocpProactor, self).recv(conn, nbytes, flags) + def recv_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recv_into(conn, buf, flags) + + def recvfrom(self, conn, nbytes, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) + + def recvfrom_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) + + def sendto(self, conn, buf, flags=0, addr=None): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendto(conn, buf, flags, addr) + def send(self, conn, buf, flags=0): with QtCore.QMutexLocker(self._lock): return super(_IocpProactor, self).send(conn, buf, flags) + def accept(self, listener): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept(listener) + + def connect(self, conn, address): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).connect(conn, address) + + def sendfile(self, sock, file, offset, count): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendfile(sock, file, offset, count) + + def accept_pipe(self, pipe): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept_pipe(pipe) + + # connect_pipe() does not actually use the delayed completion machinery. + + # This takes care of wait_for_handle() too. + def _wait_for_handle(self, handle, timeout, _is_cancel): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel) + def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" if timeout is None: @@ -123,18 +165,6 @@ def _poll(self, timeout=None): ms = 0 - def _wait_for_handle(self, handle, timeout, _is_cancel): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel) - - def accept(self, listener): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept(listener) - - def connect(self, conn, address): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).connect(conn, address) - @with_logger class _EventWorker(QtCore.QThread):