From dc3a3fe713b6d719bb566de86d47c1659d794b86 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Tue, 15 Jul 2025 22:35:56 +1200 Subject: [PATCH 1/5] gh-136655: ensure cancelled futures are notified on process pool shutdown At present when a process pool executor shuts down it is cancelling pending work items, but failing to notify any waiting threads. Fix this. See also gh-109934, which is a similar bug in the thread pool executor. --- Lib/concurrent/futures/process.py | 4 ++- Lib/test/test_concurrent_futures/executor.py | 32 +++++++++++++++++++ Lib/test/test_concurrent_futures/util.py | 15 +++++++++ ...-10-13-13-05-15.gh-issue-136655.R8fBtC.rst | 2 ++ 4 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index a14650bf5fa47c..0004edf760701f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -518,7 +518,9 @@ def flag_executor_shutting_down(self): # to only have futures that are currently running. new_pending_work_items = {} for work_id, work_item in self.pending_work_items.items(): - if not work_item.future.cancel(): + if work_item.future.cancel(): + work_item.future.set_running_or_notify_cancel() + else: new_pending_work_items[work_id] = work_item self.pending_work_items = new_pending_work_items # Drain work_ids_queue since we no longer need to diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index a37c4d45f07b17..00687ae6de0513 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -247,3 +247,35 @@ def test_swallows_falsey_exceptions(self): msg = 'lenlen' with self.assertRaisesRegex(FalseyLenException, msg): self.executor.submit(raiser, FalseyLenException, msg).result() + + def test_shutdown_notifies_cancelled_futures(self): + + # TODO: remove when gh-109934 is fixed + if self.executor_type is futures.ThreadPoolExecutor: + self.skipTest("gh-109934: skipping thread pool executor") + + # gh-136655: ensure cancelled futures are notified + count = self.worker_count * 2 + barrier = self.create_barrier(self.worker_count + 1) + with self.executor as exec: + fs = [exec.submit(blocking_raiser, + barrier if index < self.worker_count else None) + for index in range(count)] + + exec.shutdown(wait=False, cancel_futures=True) + try: + barrier.wait() + except threading.BrokenBarrierError: + pass + + for future in fs: + self.assertRaises( + (FalseyBoolException, futures.CancelledError, threading.BrokenBarrierError), + future.result) + + self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs]) + +def blocking_raiser(barrier=None): + if barrier is not None: + barrier.wait(1) + raise FalseyBoolException() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index 2a9e55152b82d5..e25e15ced61948 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -80,6 +80,9 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor + def create_barrier(self, count): + return threading.Barrier(count) + def create_event(self): return threading.Event() @@ -88,6 +91,9 @@ def create_event(self): class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor + def create_barrier(self, count): + self.skipTest("InterpreterPoolExecutor doesn't support barriers") + def create_event(self): self.skipTest("InterpreterPoolExecutor doesn't support events") @@ -107,6 +113,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() @@ -122,6 +131,9 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() @@ -141,6 +153,9 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() + def create_barrier(self, count): + return self.manager.Barrier(count) + def create_event(self): return self.manager.Event() diff --git a/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst b/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst new file mode 100644 index 00000000000000..4a987a4f694987 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-10-13-13-05-15.gh-issue-136655.R8fBtC.rst @@ -0,0 +1,2 @@ ++Ensure :class:`concurrent.futures.ProcessPoolExecutor` notifies any futures +it cancels on shutdown. From dc48587fcfd36f88cae25b70b6a8e7d6ee20cc5b Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Mon, 13 Oct 2025 14:51:41 +1300 Subject: [PATCH 2/5] Use default timeout for barrier in test --- Lib/test/test_concurrent_futures/executor.py | 4 ++-- Lib/test/test_concurrent_futures/util.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 00687ae6de0513..3475cff4eabfd9 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -256,7 +256,7 @@ def test_shutdown_notifies_cancelled_futures(self): # gh-136655: ensure cancelled futures are notified count = self.worker_count * 2 - barrier = self.create_barrier(self.worker_count + 1) + barrier = self.create_barrier(self.worker_count + 1, timeout=1) with self.executor as exec: fs = [exec.submit(blocking_raiser, barrier if index < self.worker_count else None) @@ -277,5 +277,5 @@ def test_shutdown_notifies_cancelled_futures(self): def blocking_raiser(barrier=None): if barrier is not None: - barrier.wait(1) + barrier.wait() raise FalseyBoolException() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index e25e15ced61948..1e3e350bef96c4 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -80,8 +80,8 @@ def get_context(self): class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor - def create_barrier(self, count): - return threading.Barrier(count) + def create_barrier(self, count, **kwargs): + return threading.Barrier(count, **kwargs) def create_event(self): return threading.Event() @@ -91,7 +91,7 @@ def create_event(self): class InterpreterPoolMixin(ExecutorMixin): executor_type = futures.InterpreterPoolExecutor - def create_barrier(self, count): + def create_barrier(self, count, **kwargs): self.skipTest("InterpreterPoolExecutor doesn't support barriers") def create_event(self): @@ -113,8 +113,8 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() - def create_barrier(self, count): - return self.manager.Barrier(count) + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) def create_event(self): return self.manager.Event() @@ -131,8 +131,8 @@ def get_context(self): self.skipTest("ProcessPoolExecutor unavailable on this system") return super().get_context() - def create_barrier(self, count): - return self.manager.Barrier(count) + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) def create_event(self): return self.manager.Event() @@ -153,8 +153,8 @@ def get_context(self): self.skipTest("TSAN doesn't support threads after fork") return super().get_context() - def create_barrier(self, count): - return self.manager.Barrier(count) + def create_barrier(self, count, **kwargs): + return self.manager.Barrier(count, **kwargs) def create_event(self): return self.manager.Event() From f10dbb9b539f54be328fe8c39a7b7f5cc96235fa Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Thu, 16 Oct 2025 12:04:37 +1300 Subject: [PATCH 3/5] Silence fork-in-thread warnings, incidentally fixing a mysterious and prima facie unrelated unit test failure/hang. --- Lib/test/test_concurrent_futures/executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 3475cff4eabfd9..80ee7d5022e92b 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -248,6 +248,7 @@ def test_swallows_falsey_exceptions(self): with self.assertRaisesRegex(FalseyLenException, msg): self.executor.submit(raiser, FalseyLenException, msg).result() + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_shutdown_notifies_cancelled_futures(self): # TODO: remove when gh-109934 is fixed From 32cfb298a9750d028939d531fef66298c283bec0 Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Fri, 17 Oct 2025 13:12:02 +1300 Subject: [PATCH 4/5] Address reviewer feedback --- Lib/concurrent/futures/process.py | 1 + Lib/test/test_concurrent_futures/executor.py | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0004edf760701f..2eff77f04fb921 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -519,6 +519,7 @@ def flag_executor_shutting_down(self): new_pending_work_items = {} for work_id, work_item in self.pending_work_items.items(): if work_item.future.cancel(): + # gh-136655: ensure cancelled futures are notified work_item.future.set_running_or_notify_cancel() else: new_pending_work_items[work_id] = work_item diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 80ee7d5022e92b..537e3c9119aca9 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -39,6 +39,16 @@ def __len__(self): return 0 +class CancelNotifyTestException(Exception): + pass + + +def blocking_raiser(barrier=None): + if barrier is not None: + barrier.wait() + raise CancelNotifyTestException() + + class ExecutorTest: # Executor.shutdown() and context manager usage is tested by @@ -250,6 +260,7 @@ def test_swallows_falsey_exceptions(self): @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_shutdown_notifies_cancelled_futures(self): + self.assertGreater(self.worker_count, 1) # TODO: remove when gh-109934 is fixed if self.executor_type is futures.ThreadPoolExecutor: @@ -271,12 +282,7 @@ def test_shutdown_notifies_cancelled_futures(self): for future in fs: self.assertRaises( - (FalseyBoolException, futures.CancelledError, threading.BrokenBarrierError), + (CancelNotifyTestException, futures.CancelledError, threading.BrokenBarrierError), future.result) self.assertIn('CANCELLED_AND_NOTIFIED', [f._state for f in fs]) - -def blocking_raiser(barrier=None): - if barrier is not None: - barrier.wait() - raise FalseyBoolException() From 98276628f034a4e3116e8df44685c1464278a09b Mon Sep 17 00:00:00 2001 From: Duane Griffin Date: Fri, 17 Oct 2025 23:24:47 +1300 Subject: [PATCH 5/5] Skip instead of failing if assumptions are invalid --- Lib/test/test_concurrent_futures/executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index 537e3c9119aca9..cbad521ea3cf81 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -260,7 +260,8 @@ def test_swallows_falsey_exceptions(self): @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_shutdown_notifies_cancelled_futures(self): - self.assertGreater(self.worker_count, 1) + if self.worker_count < 2: + self.skipTest("test requires more than one worker") # TODO: remove when gh-109934 is fixed if self.executor_type is futures.ThreadPoolExecutor: