From 6b87f99fa797939f1fd5cb882fa7027566dd3e76 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 19 Feb 2024 19:55:28 -0800 Subject: [PATCH 1/3] Add main_thread_only execmodel In order to prevent tasks from running in a non-main thread, wait for the previous task inside _try_send_to_primary_thread, then schedule the next task. Add a main_thread_only execmodel to distinguish this new behavior from the existing thread execmodel, since users of the thread execmodel expect that tasks can run in multiple threads concurrently. If concurrent remote_exec requests are submitted for the main_thread_only execmodel, then the channel will raise a RemoteError with this message: concurrent remote_exec would cause deadlock for main_thread_only execmodel In order for main_thread_only users to avoid this error, remote_exec callers must use the returned channel to wait for a task to complete before they call remote_exec again, as demonstrated in test_assert_main_thread_only. Testing of main_thread_only with pytest-xdist has shown that this behavior is compatible with the existing pytest-xdist usage because it never calls remote_exec more than once per gateway. --- doc/basics.rst | 10 ++--- src/execnet/gateway_base.py | 59 +++++++++++++++++++++++++-- src/execnet/multi.py | 2 +- testing/conftest.py | 8 ++-- testing/test_gateway.py | 79 +++++++++++++++++++++++++++++++++++++ testing/test_multi.py | 6 ++- testing/test_termination.py | 8 +++- testing/test_threadpool.py | 4 +- 8 files changed, 157 insertions(+), 19 deletions(-) diff --git a/doc/basics.rst b/doc/basics.rst index aa6dabaf..f0eebd85 100644 --- a/doc/basics.rst +++ b/doc/basics.rst @@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()`` yourself and specify a larger or not timeout. -threading models: gevent, eventlet, thread -=========================================== +threading models: gevent, eventlet, thread, main_thread_only +==================================================================== .. versionadded:: 1.2 (status: experimental!) -execnet supports "thread", "eventlet" and "gevent" as thread models -on each of the two sides. You need to decide which model to use -before you create any gateways:: +execnet supports "main_thread_only", "thread", "eventlet" and "gevent" +as thread models on each of the two sides. You need to decide which +model to use before you create any gateways:: # content of threadmodel.py import execnet diff --git a/src/execnet/gateway_base.py b/src/execnet/gateway_base.py index c055d64f..954423de 100644 --- a/src/execnet/gateway_base.py +++ b/src/execnet/gateway_base.py @@ -134,6 +134,10 @@ def Event(self): return threading.Event() +class MainThreadOnlyExecModel(ThreadExecModel): + backend = "main_thread_only" + + class EventletExecModel(ExecModel): backend = "eventlet" @@ -254,6 +258,8 @@ def get_execmodel(backend): return backend if backend == "thread": return ThreadExecModel() + elif backend == "main_thread_only": + return MainThreadOnlyExecModel() elif backend == "eventlet": return EventletExecModel() elif backend == "gevent": @@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False): self._shuttingdown = False self._waitall_events = [] if hasprimary: - if self.execmodel.backend != "thread": + if self.execmodel.backend not in ("thread", "main_thread_only"): raise ValueError("hasprimary=True requires thread model") self._primary_thread_task_ready = self.execmodel.Event() else: @@ -332,7 +338,7 @@ def integrate_as_primary_thread(self): """integrate the thread with which we are called as a primary thread for executing functions triggered with spawn(). """ - assert self.execmodel.backend == "thread", self.execmodel + assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel primary_thread_task_ready = self._primary_thread_task_ready # interacts with code at REF1 while 1: @@ -345,7 +351,11 @@ def integrate_as_primary_thread(self): with self._running_lock: if self._shuttingdown: break - primary_thread_task_ready.clear() + # Only clear if _try_send_to_primary_thread has not + # yet set the next self._primary_thread_task reply + # after waiting for this one to complete. + if reply is self._primary_thread_task: + primary_thread_task_ready.clear() def trigger_shutdown(self): with self._running_lock: @@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply): # wake up primary thread primary_thread_task_ready.set() return True + elif ( + self.execmodel.backend == "main_thread_only" + and self._primary_thread_task is not None + ): + self._primary_thread_task.waitfinish() + self._primary_thread_task = reply + # wake up primary thread (it's okay if this is already set + # because we waited for the previous task to finish above + # and integrate_as_primary_thread will not clear it when + # it enters self._running_lock if it detects that a new + # task is available) + primary_thread_task_ready.set() + return True return False def spawn(self, func, *args, **kwargs): @@ -857,6 +880,9 @@ def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): ENDMARKER = object() INTERRUPT_TEXT = "keyboard-interrupted" +MAIN_THREAD_ONLY_DEADLOCK_TEXT = ( + "concurrent remote_exec would cause deadlock for main_thread_only execmodel" +) class ChannelFactory: @@ -1105,6 +1131,20 @@ def join(self, timeout=None): class WorkerGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): + if self._execpool.execmodel.backend == "main_thread_only": + # It's necessary to wait for a short time in order to ensure + # that we do not report a false-positive deadlock error, since + # channel close does not elicit a response that would provide + # a guarantee to remote_exec callers that the previous task + # has released the main thread. If the timeout expires then it + # should be practically impossible to report a false-positive. + if not self._executetask_complete.wait(timeout=1): + channel.close(MAIN_THREAD_ONLY_DEADLOCK_TEXT) + return + # It's only safe to clear here because the above wait proves + # that there is not a previous task about to set it again. + self._executetask_complete.clear() + sourcetask = loads_internal(sourcetask) self._execpool.spawn(self.executetask, (channel, sourcetask)) @@ -1132,8 +1172,14 @@ def serve(self): def trace(msg): self._trace("[serve] " + msg) - hasprimary = self.execmodel.backend == "thread" + hasprimary = self.execmodel.backend in ("thread", "main_thread_only") self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary) + self._executetask_complete = None + if self.execmodel.backend == "main_thread_only": + self._executetask_complete = self.execmodel.Event() + # Initialize state to indicate that there is no previous task + # executing so that we don't need a separate flag to track this. + self._executetask_complete.set() trace("spawning receiver thread") self._initreceive() try: @@ -1176,6 +1222,11 @@ def executetask(self, item): return self._trace("ignoring EOFError because receiving finished") channel.close() + if self._executetask_complete is not None: + # Indicate that this task has finished executing, meaning + # that there is no possibility of it triggering a deadlock + # for the next spawn call. + self._executetask_complete.set() # diff --git a/src/execnet/multi.py b/src/execnet/multi.py index 7629ddb3..c63a57e8 100644 --- a/src/execnet/multi.py +++ b/src/execnet/multi.py @@ -107,7 +107,7 @@ def makegateway(self, spec=None): id= specifies the gateway id python= specifies which python interpreter to execute - execmodel=model 'thread', 'eventlet', 'gevent' model for execution + execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution chdir= specifies to which directory to change nice= specifies process priority of new process env:NAME=value specifies a remote environment variable setting. diff --git a/testing/conftest.py b/testing/conftest.py index 9ec44d30..cd44f92d 100644 --- a/testing/conftest.py +++ b/testing/conftest.py @@ -124,7 +124,7 @@ def anypython(request): pytest.skip(f"no {name} found") if "execmodel" in request.fixturenames and name != "sys.executable": backend = request.getfixturevalue("execmodel").backend - if backend != "thread": + if backend not in ("thread", "main_thread_only"): pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}") return executable @@ -173,9 +173,11 @@ def gw(request, execmodel, group): return gw -@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session") +@pytest.fixture( + params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session" +) def execmodel(request): - if request.param != "thread": + if request.param not in ("thread", "main_thread_only"): pytest.importorskip(request.param) if request.param in ("eventlet", "gevent") and sys.platform == "win32": pytest.xfail(request.param + " does not work on win32") diff --git a/testing/test_gateway.py b/testing/test_gateway.py index 809a13d9..ee4ce375 100644 --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -525,3 +525,82 @@ def sendback(channel): if interleave_getstatus: print(gw.remote_status()) assert ch.receive(timeout=0.5) == 1234 + + +def test_assert_main_thread_only(execmodel, makegateway): + if execmodel.backend != "main_thread_only": + pytest.skip("can only run with main_thread_only") + + gw = makegateway(spec=f"execmodel={execmodel.backend}//popen") + + try: + # Submit multiple remote_exec requests in quick succession and + # assert that all tasks execute in the main thread. It is + # necessary to call receive on each channel before the next + # remote_exec call, since the channel will raise an error if + # concurrent remote_exec requests are submitted as in + # test_main_thread_only_concurrent_remote_exec_deadlock. + for i in range(10): + ch = gw.remote_exec( + """ + import time, threading + time.sleep(0.02) + channel.send(threading.current_thread() is threading.main_thread()) + """ + ) + + try: + res = ch.receive() + finally: + ch.close() + # This doesn't actually block because we closed + # the channel already, but it does check for remote + # errors and raise them. + ch.waitclose() + if res is not True: + pytest.fail("remote raised\n%s" % res) + finally: + gw.exit() + gw.join() + + +def test_main_thread_only_concurrent_remote_exec_deadlock(execmodel, makegateway): + if execmodel.backend != "main_thread_only": + pytest.skip("can only run with main_thread_only") + + gw = makegateway(spec=f"execmodel={execmodel.backend}//popen") + channels = [] + try: + # Submit multiple remote_exec requests in quick succession and + # assert that MAIN_THREAD_ONLY_DEADLOCK_TEXT is raised if + # concurrent remote_exec requests are submitted for the + # main_thread_only execmodel (as compensation for the lack of + # back pressure in remote_exec calls which do not attempt to + # block until the remote main thread is idle). + for i in range(2): + channels.append( + gw.remote_exec( + """ + import threading + channel.send(threading.current_thread() is threading.main_thread()) + # Wait forever, ensuring that the deadlock case triggers. + channel.gateway.execmodel.Event().wait() + """ + ) + ) + + expected_results = ( + True, + execnet.gateway_base.MAIN_THREAD_ONLY_DEADLOCK_TEXT, + ) + for expected, ch in zip(expected_results, channels): + try: + res = ch.receive() + except execnet.RemoteError as e: + res = e.formatted + assert res == expected + finally: + for ch in channels: + ch.close() + gw.exit() + gw.join() diff --git a/testing/test_multi.py b/testing/test_multi.py index 79861b11..3996a964 100644 --- a/testing/test_multi.py +++ b/testing/test_multi.py @@ -223,8 +223,9 @@ def test_terminate_with_proxying(self): group.terminate(1.0) +@pytest.mark.xfail(reason="active_count() has been broken for some time") def test_safe_terminate(execmodel): - if execmodel.backend != "threading": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail( "execution model %r does not support task count" % execmodel.backend ) @@ -246,8 +247,9 @@ def kill(): assert execmodel.active_count() == active +@pytest.mark.xfail(reason="active_count() has been broken for some time") def test_safe_terminate2(execmodel): - if execmodel.backend != "threading": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail( "execution model %r does not support task count" % execmodel.backend ) diff --git a/testing/test_termination.py b/testing/test_termination.py index 282c1979..5da516c4 100644 --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -36,7 +36,7 @@ def doit(): def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail("test and execnet not compatible to greenlets yet") gw = makegateway("popen") q = execmodel.queue.Queue() @@ -97,8 +97,12 @@ def test_close_initiating_remote_no_error(testdir, anypython): def test_terminate_implicit_does_trykill(testdir, anypython, capfd, pool): - if pool.execmodel != "thread": + if pool.execmodel.backend not in ("thread", "main_thread_only"): pytest.xfail("only os threading model supported") + if sys.version_info >= (3, 12): + pytest.xfail( + "since python3.12 this test triggers RuntimeError: can't create new thread at interpreter shutdown" + ) p = testdir.makepyfile( """ import sys diff --git a/testing/test_threadpool.py b/testing/test_threadpool.py index 4d1edd8c..0162e2ea 100644 --- a/testing/test_threadpool.py +++ b/testing/test_threadpool.py @@ -164,7 +164,7 @@ def wait_then_put(): def test_primary_thread_integration(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): with pytest.raises(ValueError): WorkerPool(execmodel=execmodel, hasprimary=True) return @@ -188,7 +188,7 @@ def func(): def test_primary_thread_integration_shutdown(execmodel): - if execmodel.backend != "thread": + if execmodel.backend not in ("thread", "main_thread_only"): pytest.skip("can only run with threading") pool = WorkerPool(execmodel=execmodel, hasprimary=True) queue = execmodel.queue.Queue() From 8f7f3a0ea139bbefee194c00ab2d3edbc48ee9ec Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 19 Feb 2024 19:55:52 -0800 Subject: [PATCH 2/3] Popen2IO: Fix "Bad file descriptor" error Fix init_popen_io to leave behind a sane state (0 and 1 file descriptors open), in order to prevent "Bad file descriptor" errors. Also fix test_stdouterrin_setnull to restore stdin state, while relying on capfd to do this for stdout and stderr. The "Bad file descriptor" error doesn't trigger reliably because it's triggered by garbage collection of the Popen2IO instance returned from init_popen_io as shown in this job: https://github.com/pytest-dev/execnet/actions/runs/7955155978/job/21716386705?pr=243 =================================== FAILURES =================================== __________________ test_stdouterrin_setnull[main_thread_only] __________________ execmodel = capfd = <_pytest.capture.CaptureFixture object at 0x7fba8c333990> @pytest.mark.skipif("not hasattr(os, 'dup')") def test_stdouterrin_setnull(execmodel, capfd): gateway_base.init_popen_io(execmodel) os.write(1, b"hello") > os.read(0, 1) E OSError: [Errno 9] Bad file descriptor testing/test_basics.py:254: OSError --- src/execnet/gateway_base.py | 20 +++++++++++--------- testing/test_basics.py | 25 +++++++++++++++++++------ 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/execnet/gateway_base.py b/src/execnet/gateway_base.py index 954423de..2e2859f9 100644 --- a/src/execnet/gateway_base.py +++ b/src/execnet/gateway_base.py @@ -61,7 +61,7 @@ def sleep(self, delay): raise NotImplementedError() @abc.abstractmethod - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): raise NotImplementedError() @abc.abstractmethod @@ -113,10 +113,10 @@ def start(self, func, args=()): return _thread.start_new_thread(func, args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): import os - return os.fdopen(fd, mode, bufsize, encoding="utf-8") + return os.fdopen(fd, mode, bufsize, encoding="utf-8", closefd=closefd) def Lock(self): import threading @@ -174,10 +174,10 @@ def start(self, func, args=()): return eventlet.spawn_n(func, *args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): import eventlet.green.os - return eventlet.green.os.fdopen(fd, mode, bufsize) + return eventlet.green.os.fdopen(fd, mode, bufsize, closefd=closefd) def Lock(self): import eventlet.green.threading @@ -231,11 +231,11 @@ def start(self, func, args=()): return gevent.spawn(func, *args) - def fdopen(self, fd, mode, bufsize=1): + def fdopen(self, fd, mode, bufsize=1, closefd=True): # XXX import gevent.fileobject - return gevent.fileobject.FileObjectThread(fd, mode, bufsize) + return gevent.fileobject.FileObjectThread(fd, mode, bufsize, closefd=closefd) def Lock(self): import gevent.lock @@ -1682,8 +1682,10 @@ def init_popen_io(execmodel): os.dup2(fd, 2) os.close(fd) io = Popen2IO(stdout, stdin, execmodel) - sys.stdin = execmodel.fdopen(0, "r", 1) - sys.stdout = execmodel.fdopen(1, "w", 1) + # Use closefd=False since 0 and 1 are shared with + # sys.__stdin__ and sys.__stdout__. + sys.stdin = execmodel.fdopen(0, "r", 1, closefd=False) + sys.stdout = execmodel.fdopen(1, "w", 1, closefd=False) return io diff --git a/testing/test_basics.py b/testing/test_basics.py index 321e18f1..26816f3e 100644 --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -249,12 +249,25 @@ class Arg: @pytest.mark.skipif("not hasattr(os, 'dup')") def test_stdouterrin_setnull(execmodel, capfd): - gateway_base.init_popen_io(execmodel) - os.write(1, b"hello") - os.read(0, 1) - out, err = capfd.readouterr() - assert not out - assert not err + # Backup and restore stdin state, and rely on capfd to handle + # this for stdout and stderr. + orig_stdin = sys.stdin + orig_stdin_fd = os.dup(0) + try: + # The returned Popen2IO instance can be garbage collected + # prematurely since we don't hold a reference here, but we + # tolerate this because it is intended to leave behind a + # sane state afterwards. + gateway_base.init_popen_io(execmodel) + os.write(1, b"hello") + os.read(0, 1) + out, err = capfd.readouterr() + assert not out + assert not err + finally: + sys.stdin = orig_stdin + os.dup2(orig_stdin_fd, 0) + os.close(orig_stdin_fd) class PseudoChannel: From c5cef445df8ee98128de5a05a7fc06417f4c5d5d Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 22 Feb 2024 14:15:48 -0800 Subject: [PATCH 3/3] Add #243 to CHANGELOG --- CHANGELOG.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a0f0d67a..dceb30c1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,3 +1,20 @@ +2.1.0 (UNRELEASED) +------------------ + +* `#243 `__: Added ``main_thread_only`` + execmodel which is derived from the thread execmodel and only executes ``remote_exec`` + calls in the main thread. + + Callers of ``remote_exec`` must use the returned channel to wait for a task to complete + before they call remote_exec again, otherwise the ``remote_exec`` call will fail with a + ``concurrent remote_exec would cause deadlock`` error. The main_thread_only execmodel + provides solutions for `#96 `__ and + `pytest-dev/pytest-xdist#620 `__ + (pending a new `pytest-xdist` release). + + Also fixed ``init_popen_io`` to use ``closefd=False`` for shared stdin and stdout file + descriptors, preventing ``Bad file descriptor`` errors triggered by test_stdouterrin_setnull. + 2.0.2 (2023-07-09) ------------------