diff --git a/tests/conftest.py b/tests/conftest.py index e52683a..acb8c38 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,15 @@ -# © 2018 Gerard Marull-Paretas -# © 2014 Mark Harviston -# © 2014 Arve Knudsen -# BSD License +""" +Copyright (c) 2018 Gerard Marull-Paretas +Copyright (c) 2014 Mark Harviston +Copyright (c) 2014 Arve Knudsen + +BSD License +""" -import os import logging -from pytest import fixture +import os +from pytest import fixture logging.basicConfig( level=logging.DEBUG, format="%(levelname)s\t%(filename)s:%(lineno)s %(message)s" diff --git a/tests/test_qeventloop.py b/tests/test_qeventloop.py index be4d216..5ca86d9 100644 --- a/tests/test_qeventloop.py +++ b/tests/test_qeventloop.py @@ -5,13 +5,13 @@ import asyncio import ctypes -import threading import logging import multiprocessing import os import socket import subprocess import sys +import threading import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor @@ -809,6 +809,61 @@ async def mycoro(): assert "seconds" in msg +def test_run_until_complete_returns_future_result(loop): + async def coro(): + await asyncio.sleep(0) + return 42 + + assert loop.run_until_complete(asyncio.wait_for(coro(), timeout=1)) == 42 + + +def test_run_forever_custom_exit_code(loop, application): + if hasattr(application, "exec"): + orig_exec = application.exec + application.exec = lambda: 42 + try: + assert loop.run_forever() == 42 + finally: + application.exec = orig_exec + else: + orig_exec = application.exec_ + application.exec_ = lambda: 42 + try: + assert loop.run_forever() == 42 + finally: + application.exec_ = orig_exec + + +def test_qeventloop_in_qthread(): + class CoroutineExecutorThread(qasync.QtCore.QThread): + def __init__(self, coro): + super().__init__() + self.coro = coro + self.loop = None + + def run(self): + self.loop = qasync.QEventLoop(self) + asyncio.set_event_loop(self.loop) + asyncio.run(self.coro) + + def join(self): + self.loop.stop() + self.loop.close() + self.wait() + + event = threading.Event() + + async def coro(): + await asyncio.sleep(0.1) + event.set() + + thread = CoroutineExecutorThread(coro()) + thread.start() + + assert event.wait(timeout=1), "Coroutine did not execute successfully" + + thread.join() # Ensure thread cleanup + def teardown_module(module): """ Remove handlers from all loggers