diff --git a/CHANGES.md b/CHANGES.md index fc4a32120afa..8b43d6e73e04 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,7 +78,7 @@ ## Bugfixes -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/python/apache_beam/runners/worker/worker_status.py b/sdks/python/apache_beam/runners/worker/worker_status.py index ecd4dc4e02c0..186453ffd375 100644 --- a/sdks/python/apache_beam/runners/worker/worker_status.py +++ b/sdks/python/apache_beam/runners/worker/worker_status.py @@ -17,6 +17,7 @@ """Worker status api handler for reporting SDK harness debug info.""" +import gc import logging import queue import sys @@ -53,11 +54,23 @@ LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60 +def _current_frames(): + # Work around https://github.com/python/cpython/issues/106883 + if (sys.version_info.minor == 11 and sys.version_info.major == 3 and + gc.isenabled()): + gc.disable() + frames = sys._current_frames() # pylint: disable=protected-access + gc.enable() + return frames + else: + return sys._current_frames() # pylint: disable=protected-access + + def thread_dump(): """Get a thread dump for the current SDK worker harness. """ # deduplicate threads with same stack trace stack_traces = defaultdict(list) - frames = sys._current_frames() # pylint: disable=protected-access + frames = _current_frames() for t in threading.enumerate(): try: @@ -269,7 +282,7 @@ def _log_lull_sampler_info(self, sampler_info, instruction): def _get_stack_trace(self, sampler_info): exec_thread = getattr(sampler_info, 'tracked_thread', None) if exec_thread is not None: - thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access + thread_frame = _current_frames().get(exec_thread.ident) return '\n'.join( traceback.format_stack(thread_frame)) if thread_frame else '' else: