diff --git a/qasync/_windows.py b/qasync/_windows.py index ec13ede..602044d 100644 --- a/qasync/_windows.py +++ b/qasync/_windows.py @@ -75,14 +75,56 @@ def close(self): self._logger.debug('Closing') super(_IocpProactor, self).close() + # Wrap all I/O submission methods to acquire the internal lock first; listed + # in the order they appear in the base class source code. + def recv(self, conn, nbytes, flags=0): with QtCore.QMutexLocker(self._lock): return super(_IocpProactor, self).recv(conn, nbytes, flags) + def recv_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recv_into(conn, buf, flags) + + def recvfrom(self, conn, nbytes, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom(conn, nbytes, flags) + + def recvfrom_into(self, conn, buf, flags=0): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).recvfrom_into(conn, buf, flags) + + def sendto(self, conn, buf, flags=0, addr=None): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendto(conn, buf, flags, addr) + def send(self, conn, buf, flags=0): with QtCore.QMutexLocker(self._lock): return super(_IocpProactor, self).send(conn, buf, flags) + def accept(self, listener): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept(listener) + + def connect(self, conn, address): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).connect(conn, address) + + def sendfile(self, sock, file, offset, count): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).sendfile(sock, file, offset, count) + + def accept_pipe(self, pipe): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self).accept_pipe(pipe) + + # connect_pipe() does not actually use the delayed completion machinery. + + # This takes care of wait_for_handle() too. + def _wait_for_handle(self, handle, timeout, _is_cancel): + with QtCore.QMutexLocker(self._lock): + return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel) + def _poll(self, timeout=None): """Override in order to handle events in a threadsafe manner.""" if timeout is None: @@ -123,18 +165,6 @@ def _poll(self, timeout=None): ms = 0 - def _wait_for_handle(self, handle, timeout, _is_cancel): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel) - - def accept(self, listener): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).accept(listener) - - def connect(self, conn, address): - with QtCore.QMutexLocker(self._lock): - return super(_IocpProactor, self).connect(conn, address) - @with_logger class _EventWorker(QtCore.QThread):