Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
17 changes: 15 additions & 2 deletions sdks/python/apache_beam/runners/worker/worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""Worker status api handler for reporting SDK harness debug info."""

import gc
import logging
import queue
import sys
Expand Down Expand Up @@ -53,11 +54,23 @@
LOG_LULL_FULL_THREAD_DUMP_LULL_S = 20 * 60


def _current_frames():
# Work around https://github.com/python/cpython/issues/106883
if (sys.version_info.minor == 11 and sys.version_info.major == 3 and
gc.isenabled()):
gc.disable()
frames = sys._current_frames() # pylint: disable=protected-access
gc.enable()
return frames
else:
return sys._current_frames() # pylint: disable=protected-access


def thread_dump():
"""Get a thread dump for the current SDK worker harness. """
# deduplicate threads with same stack trace
stack_traces = defaultdict(list)
frames = sys._current_frames() # pylint: disable=protected-access
frames = _current_frames()

for t in threading.enumerate():
try:
Expand Down Expand Up @@ -269,7 +282,7 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
def _get_stack_trace(self, sampler_info):
exec_thread = getattr(sampler_info, 'tracked_thread', None)
if exec_thread is not None:
thread_frame = sys._current_frames().get(exec_thread.ident) # pylint: disable=protected-access
thread_frame = _current_frames().get(exec_thread.ident)
return '\n'.join(
traceback.format_stack(thread_frame)) if thread_frame else ''
else:
Expand Down
Loading