diff --git a/Doc/library/profiling.sampling.rst b/Doc/library/profiling.sampling.rst index 41cb254174dbb4..b5e6a2c7a0ed8e 100644 --- a/Doc/library/profiling.sampling.rst +++ b/Doc/library/profiling.sampling.rst @@ -342,6 +342,8 @@ The default configuration works well for most use cases: - Disabled * - Default for ``--subprocesses`` - Disabled + * - Default for ``--blocking`` + - Disabled (non-blocking sampling) Sampling interval and duration @@ -392,6 +394,50 @@ This option is particularly useful when investigating concurrency issues or when work is distributed across a thread pool. +.. _blocking-mode: + +Blocking mode +------------- + +By default, Tachyon reads the target process's memory without stopping it. +This non-blocking approach is ideal for most profiling scenarios because it +imposes virtually zero overhead on the target application: the profiled +program runs at full speed and is unaware it is being observed. + +However, non-blocking sampling can occasionally produce incomplete or +inconsistent stack traces in applications with many generators or coroutines +that rapidly switch between yield points, or in programs with very fast-changing +call stacks where functions enter and exit between the start and end of a single +stack read, resulting in reconstructed stacks that mix frames from different +execution states or that never actually existed. + +For these cases, the :option:`--blocking` option stops the target process during +each sample:: + + python -m profiling.sampling run --blocking script.py + python -m profiling.sampling attach --blocking 12345 + +When blocking mode is enabled, the profiler suspends the target process, +reads its stack, then resumes it. This guarantees that each captured stack +represents a real, consistent snapshot of what the process was doing at that +instant. The trade-off is that the target process runs slower because it is +repeatedly paused. + +.. warning:: + + Do not use very high sample rates (low ``--interval`` values) with blocking + mode. Suspending and resuming a process takes time, and if the sampling + interval is too short, the target will spend more time stopped than running. + For blocking mode, intervals of 1000 microseconds (1 millisecond) or higher + are recommended. The default 100 microsecond interval may cause noticeable + slowdown in the target application. + +Use blocking mode only when you observe inconsistent stacks in your profiles, +particularly with generator-heavy or coroutine-heavy code. For most +applications, the default non-blocking mode provides accurate results with +zero impact on the target process. + + Special frames -------------- @@ -1383,6 +1429,13 @@ Sampling options Also profile subprocesses. Each subprocess gets its own profiler instance and output file. Incompatible with ``--live``. +.. option:: --blocking + + Pause the target process during each sample. This ensures consistent + stack traces at the cost of slowing down the target. Use with longer + intervals (1000 µs or higher) to minimize impact. See :ref:`blocking-mode` + for details. + Mode options ------------ diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index 0403c75c80f043..ccd6e954d79698 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -347,6 +347,13 @@ def _add_sampling_options(parser): action="store_true", help="Also profile subprocesses. Each subprocess gets its own profiler and output file.", ) + sampling_group.add_argument( + "--blocking", + action="store_true", + help="Stop all threads in target process before sampling to get consistent snapshots. " + "Uses thread_suspend on macOS and ptrace on Linux. Adds overhead but ensures memory " + "reads are from a frozen state.", + ) def _add_mode_options(parser): @@ -585,6 +592,15 @@ def _validate_args(args, parser): if getattr(args, 'command', None) == "replay": return + # Warn about blocking mode with aggressive sampling intervals + if args.blocking and args.interval < 100: + print( + f"Warning: --blocking with a {args.interval} µs interval will stop all threads " + f"{1_000_000 // args.interval} times per second. " + "Consider using --interval 1000 or higher to reduce overhead.", + file=sys.stderr + ) + # Check if live mode is available if hasattr(args, 'live') and args.live and LiveStatsCollector is None: parser.error( @@ -861,6 +877,7 @@ def _handle_attach(args): native=args.native, gc=args.gc, opcodes=args.opcodes, + blocking=args.blocking, ) _handle_output(collector, args, args.pid, mode) @@ -939,6 +956,7 @@ def _handle_run(args): native=args.native, gc=args.gc, opcodes=args.opcodes, + blocking=args.blocking, ) _handle_output(collector, args, process.pid, mode) finally: @@ -984,6 +1002,7 @@ def _handle_live_attach(args, pid): native=args.native, gc=args.gc, opcodes=args.opcodes, + blocking=args.blocking, ) @@ -1031,6 +1050,7 @@ def _handle_live_run(args): native=args.native, gc=args.gc, opcodes=args.opcodes, + blocking=args.blocking, ) finally: # Clean up the subprocess diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index c0c4c88d13cc2f..2fe022c85b0b31 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -1,4 +1,5 @@ import _remote_debugging +import contextlib import os import statistics import sys @@ -7,7 +8,26 @@ from collections import deque from _colorize import ANSIColors +from .pstats_collector import PstatsCollector +from .stack_collector import CollapsedStackCollector, FlamegraphCollector +from .heatmap_collector import HeatmapCollector +from .gecko_collector import GeckoCollector from .binary_collector import BinaryCollector + + +@contextlib.contextmanager +def _pause_threads(unwinder, blocking): + """Context manager to pause/resume threads around sampling if blocking is True.""" + if blocking: + unwinder.pause_threads() + try: + yield + finally: + unwinder.resume_threads() + else: + yield + + from .constants import ( PROFILING_MODE_WALL, PROFILING_MODE_CPU, @@ -25,12 +45,13 @@ class SampleProfiler: - def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False): + def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False): self.pid = pid self.sample_interval_usec = sample_interval_usec self.all_threads = all_threads self.mode = mode # Store mode for later use self.collect_stats = collect_stats + self.blocking = blocking try: self.unwinder = self._new_unwinder(native, gc, opcodes, skip_non_matching_threads) except RuntimeError as err: @@ -60,12 +81,11 @@ def sample(self, collector, duration_sec=10, *, async_aware=False): running_time = 0 num_samples = 0 errors = 0 + interrupted = False start_time = next_time = time.perf_counter() last_sample_time = start_time realtime_update_interval = 1.0 # Update every second last_realtime_update = start_time - interrupted = False - try: while running_time < duration_sec: # Check if live collector wants to stop @@ -75,14 +95,15 @@ def sample(self, collector, duration_sec=10, *, async_aware=False): current_time = time.perf_counter() if next_time < current_time: try: - if async_aware == "all": - stack_frames = self.unwinder.get_all_awaited_by() - elif async_aware == "running": - stack_frames = self.unwinder.get_async_stack_trace() - else: - stack_frames = self.unwinder.get_stack_trace() - collector.collect(stack_frames) - except ProcessLookupError: + with _pause_threads(self.unwinder, self.blocking): + if async_aware == "all": + stack_frames = self.unwinder.get_all_awaited_by() + elif async_aware == "running": + stack_frames = self.unwinder.get_async_stack_trace() + else: + stack_frames = self.unwinder.get_stack_trace() + collector.collect(stack_frames) + except ProcessLookupError as e: duration_sec = current_time - start_time break except (RuntimeError, UnicodeDecodeError, MemoryError, OSError): @@ -350,6 +371,7 @@ def sample( native=False, gc=True, opcodes=False, + blocking=False, ): """Sample a process using the provided collector. @@ -365,6 +387,7 @@ def sample( native: Whether to include native frames gc: Whether to include GC frames opcodes: Whether to include opcode information + blocking: Whether to stop all threads before sampling for consistent snapshots Returns: The collector with collected samples @@ -390,6 +413,7 @@ def sample( opcodes=opcodes, skip_non_matching_threads=skip_non_matching_threads, collect_stats=realtime_stats, + blocking=blocking, ) profiler.realtime_stats = realtime_stats @@ -411,6 +435,7 @@ def sample_live( native=False, gc=True, opcodes=False, + blocking=False, ): """Sample a process in live/interactive mode with curses TUI. @@ -426,6 +451,7 @@ def sample_live( native: Whether to include native frames gc: Whether to include GC frames opcodes: Whether to include opcode information + blocking: Whether to stop all threads before sampling for consistent snapshots Returns: The collector with collected samples @@ -451,6 +477,7 @@ def sample_live( opcodes=opcodes, skip_non_matching_threads=skip_non_matching_threads, collect_stats=realtime_stats, + blocking=blocking, ) profiler.realtime_stats = realtime_stats diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index e298f1db4e2ed4..8837d3b729442c 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -2931,24 +2931,24 @@ def top(): "Test only runs on Linux with process_vm_readv support", ) def test_partial_stack_reuse(self): - """Test that unchanged bottom frames are reused when top changes (A→B→C to A→B→D).""" + """Test that unchanged parent frames are reused from cache when top frame moves.""" script_body = """\ - def func_c(): - sock.sendall(b"at_c") + def level4(): + sock.sendall(b"sync1") sock.recv(16) - - def func_d(): - sock.sendall(b"at_d") + sock.sendall(b"sync2") sock.recv(16) - def func_b(): - func_c() - func_d() + def level3(): + level4() - def func_a(): - func_b() + def level2(): + level3() + + def level1(): + level2() - func_a() + level1() """ with self._target_process(script_body) as ( @@ -2958,55 +2958,51 @@ def func_a(): ): unwinder = make_unwinder(cache_frames=True) - # Sample at C: stack is A→B→C - frames_c = self._sample_frames( + # Sample 1: level4 at first sendall + frames1 = self._sample_frames( client_socket, unwinder, - b"at_c", + b"sync1", b"ack", - {"func_a", "func_b", "func_c"}, + {"level1", "level2", "level3", "level4"}, ) - # Sample at D: stack is A→B→D (C returned, D called) - frames_d = self._sample_frames( + # Sample 2: level4 at second sendall (same stack, different line) + frames2 = self._sample_frames( client_socket, unwinder, - b"at_d", + b"sync2", b"done", - {"func_a", "func_b", "func_d"}, + {"level1", "level2", "level3", "level4"}, ) - self.assertIsNotNone(frames_c) - self.assertIsNotNone(frames_d) + self.assertIsNotNone(frames1) + self.assertIsNotNone(frames2) - # Find func_a and func_b frames in both samples def find_frame(frames, funcname): for f in frames: if f.funcname == funcname: return f return None - frame_a_in_c = find_frame(frames_c, "func_a") - frame_b_in_c = find_frame(frames_c, "func_b") - frame_a_in_d = find_frame(frames_d, "func_a") - frame_b_in_d = find_frame(frames_d, "func_b") - - self.assertIsNotNone(frame_a_in_c) - self.assertIsNotNone(frame_b_in_c) - self.assertIsNotNone(frame_a_in_d) - self.assertIsNotNone(frame_b_in_d) - - # The bottom frames (A, B) should be the SAME objects (cache reuse) - self.assertIs( - frame_a_in_c, - frame_a_in_d, - "func_a frame should be reused from cache", - ) - self.assertIs( - frame_b_in_c, - frame_b_in_d, - "func_b frame should be reused from cache", + # level4 should have different line numbers (it moved) + l4_1 = find_frame(frames1, "level4") + l4_2 = find_frame(frames2, "level4") + self.assertIsNotNone(l4_1) + self.assertIsNotNone(l4_2) + self.assertNotEqual( + l4_1.location.lineno, + l4_2.location.lineno, + "level4 should be at different lines", ) + # Parent frames (level1, level2, level3) should be reused from cache + for name in ["level1", "level2", "level3"]: + f1 = find_frame(frames1, name) + f2 = find_frame(frames2, name) + self.assertIsNotNone(f1, f"{name} missing from sample 1") + self.assertIsNotNone(f2, f"{name} missing from sample 2") + self.assertIs(f1, f2, f"{name} should be reused from cache") + @skip_if_not_supported @unittest.skipIf( sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_blocking.py b/Lib/test/test_profiling/test_sampling_profiler/test_blocking.py new file mode 100644 index 00000000000000..102eb51b556cc7 --- /dev/null +++ b/Lib/test/test_profiling/test_sampling_profiler/test_blocking.py @@ -0,0 +1,141 @@ +"""Tests for blocking mode sampling profiler.""" + +import io +import textwrap +import unittest +from unittest import mock + +try: + import _remote_debugging # noqa: F401 + import profiling.sampling + import profiling.sampling.sample + from profiling.sampling.stack_collector import CollapsedStackCollector +except ImportError: + raise unittest.SkipTest( + "Test only runs when _remote_debugging is available" + ) + +from test.support import requires_remote_subprocess_debugging + +from .helpers import test_subprocess + +# Duration for profiling in tests +PROFILING_DURATION_SEC = 1 + + +@requires_remote_subprocess_debugging() +class TestBlockingModeStackAccuracy(unittest.TestCase): + """Test that blocking mode produces accurate stack traces. + + When using blocking mode, the target process is stopped during sampling. + This ensures that we see accurate stack traces where functions appear + in the correct caller/callee relationship. + + These tests verify that generator functions are correctly shown at the + top of the stack when they are actively executing, and not incorrectly + shown under their caller's code. + """ + + @classmethod + def setUpClass(cls): + # Test script that uses a generator consumed in a loop. + # When consume_generator is on the arithmetic lines (temp1, temp2, etc.), + # fibonacci_generator should NOT be in the stack at all. + # Line numbers are important here - see ARITHMETIC_LINES below. + cls.generator_script = textwrap.dedent(''' + def fibonacci_generator(n): + a, b = 0, 1 + for _ in range(n): + yield a + a, b = b, a + b + + def consume_generator(): + gen = fibonacci_generator(10000) + for value in gen: + temp1 = value + 1 + temp2 = value * 2 + temp3 = value - 1 + result = temp1 + temp2 + temp3 + + def main(): + while True: + consume_generator() + + _test_sock.sendall(b"working") + main() + ''') + # Line numbers of the arithmetic operations in consume_generator. + # These are the lines where fibonacci_generator should NOT be in the stack. + # The socket injection code adds 7 lines before our script. + # temp1 = value + 1 -> line 17 + # temp2 = value * 2 -> line 18 + # temp3 = value - 1 -> line 19 + # result = ... -> line 20 + cls.ARITHMETIC_LINES = {17, 18, 19, 20} + + def test_generator_not_under_consumer_arithmetic(self): + """Test that fibonacci_generator doesn't appear when consume_generator does arithmetic. + + When consume_generator is executing arithmetic lines (temp1, temp2, etc.), + fibonacci_generator should NOT be anywhere in the stack - it's not being + called at that point. + + Valid stacks: + - consume_generator at 'for value in gen:' line WITH fibonacci_generator + at the top (generator is yielding) + - consume_generator at arithmetic lines WITHOUT fibonacci_generator + (we're just doing math, not calling the generator) + + Invalid stacks (indicate torn/inconsistent reads): + - consume_generator at arithmetic lines WITH fibonacci_generator + anywhere in the stack + + Note: call_tree is ordered from bottom (index 0) to top (index -1). + """ + with test_subprocess(self.generator_script, wait_for_working=True) as subproc: + collector = CollapsedStackCollector(sample_interval_usec=100, skip_idle=False) + + with ( + io.StringIO() as captured_output, + mock.patch("sys.stdout", captured_output), + ): + profiling.sampling.sample.sample( + subproc.process.pid, + collector, + duration_sec=PROFILING_DURATION_SEC, + blocking=True, + ) + + # Analyze collected stacks + total_samples = 0 + invalid_stacks = 0 + arithmetic_samples = 0 + + for (call_tree, _thread_id), count in collector.stack_counter.items(): + total_samples += count + + if not call_tree: + continue + + # Find consume_generator in the stack and check its line number + for i, (filename, lineno, funcname) in enumerate(call_tree): + if funcname == "consume_generator" and lineno in self.ARITHMETIC_LINES: + arithmetic_samples += count + # Check if fibonacci_generator appears anywhere in this stack + func_names = [frame[2] for frame in call_tree] + if "fibonacci_generator" in func_names: + invalid_stacks += count + break + + self.assertGreater(total_samples, 10, + f"Expected at least 10 samples, got {total_samples}") + + # We should have some samples on the arithmetic lines + self.assertGreater(arithmetic_samples, 0, + f"Expected some samples on arithmetic lines, got {arithmetic_samples}") + + self.assertEqual(invalid_stacks, 0, + f"Found {invalid_stacks}/{arithmetic_samples} invalid stacks where " + f"fibonacci_generator appears in the stack when consume_generator " + f"is on an arithmetic line. This indicates torn/inconsistent stack " + f"traces are being captured.") diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_children.py b/Lib/test/test_profiling/test_sampling_profiler/test_children.py index 9b4c741727ad8c..4007b3e8d7a41f 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_children.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_children.py @@ -372,6 +372,8 @@ def test_subprocesses_incompatible_with_live(self): limit=None, no_summary=False, opcodes=False, + blocking=False, + interval=1000, ) parser = argparse.ArgumentParser() diff --git a/Misc/NEWS.d/next/Library/2025-12-20-02-33-05.gh-issue-138122.m3EF9E.rst b/Misc/NEWS.d/next/Library/2025-12-20-02-33-05.gh-issue-138122.m3EF9E.rst new file mode 100644 index 00000000000000..a5785a88b0f433 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-12-20-02-33-05.gh-issue-138122.m3EF9E.rst @@ -0,0 +1,2 @@ +Add blocking mode to Tachyon for accurate stack traces in applications with +many generators or fast-changing call stacks. Patch by Pablo Galindo. diff --git a/Modules/_remote_debugging/_remote_debugging.h b/Modules/_remote_debugging/_remote_debugging.h index 918fd2bbe4325c..4119d916be7a63 100644 --- a/Modules/_remote_debugging/_remote_debugging.h +++ b/Modules/_remote_debugging/_remote_debugging.h @@ -58,9 +58,49 @@ extern "C" { # endif #endif +// Platforms that support pausing/resuming threads for accurate stack sampling +#if defined(MS_WINDOWS) || defined(__linux__) || (defined(__APPLE__) && TARGET_OS_OSX) +# define Py_REMOTE_DEBUG_SUPPORTS_BLOCKING 1 +#endif + #ifdef MS_WINDOWS #include #include +#endif + +#if defined(__APPLE__) && TARGET_OS_OSX + +typedef struct { + mach_port_t task; + int suspended; +} _Py_RemoteDebug_ThreadsState; + +#elif defined(__linux__) + +typedef struct { + pid_t *tids; // Points to unwinder's reusable buffer + size_t count; // Number of threads currently seized +} _Py_RemoteDebug_ThreadsState; + +#elif defined(MS_WINDOWS) + +typedef NTSTATUS (NTAPI *NtSuspendProcessFunc)(HANDLE ProcessHandle); +typedef NTSTATUS (NTAPI *NtResumeProcessFunc)(HANDLE ProcessHandle); + +typedef struct { + HANDLE hProcess; + int suspended; +} _Py_RemoteDebug_ThreadsState; + +#else + +typedef struct { + int dummy; +} _Py_RemoteDebug_ThreadsState; + +#endif + +#ifdef MS_WINDOWS #define STATUS_SUCCESS ((NTSTATUS)0x00000000L) #define STATUS_INFO_LENGTH_MISMATCH ((NTSTATUS)0xC0000004L) typedef enum _WIN32_THREADSTATE { @@ -262,6 +302,15 @@ typedef struct { #ifdef MS_WINDOWS PVOID win_process_buffer; ULONG win_process_buffer_size; +#endif + // Thread stopping state (only on platforms that support it) +#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING + _Py_RemoteDebug_ThreadsState threads_state; + int threads_stopped; // 1 if threads are currently stopped +#endif +#ifdef __linux__ + pid_t *thread_tids; // Reusable buffer for thread IDs + size_t thread_tids_capacity; // Current capacity of thread_tids buffer #endif } RemoteUnwinderObject; @@ -295,6 +344,7 @@ typedef struct { uintptr_t gc_frame; // GC frame address (0 if not tracking) uintptr_t last_profiled_frame; // Last cached frame (0 if no cache) StackChunkList *chunks; // Pre-copied stack chunks + int skip_first_frame; // Skip frame_addr itself (continue from its caller) /* Outputs */ PyObject *frame_info; // List to append FrameInfo objects @@ -518,6 +568,11 @@ extern PyObject* unwind_stack_for_thread( uintptr_t gc_frame ); +/* Thread stopping functions (for blocking mode) */ +extern void _Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st); +extern int _Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st); +extern void _Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st); + /* ============================================================================ * ASYNCIO FUNCTION DECLARATIONS * ============================================================================ */ diff --git a/Modules/_remote_debugging/clinic/module.c.h b/Modules/_remote_debugging/clinic/module.c.h index 263dfd685657da..15df48fabb56b2 100644 --- a/Modules/_remote_debugging/clinic/module.c.h +++ b/Modules/_remote_debugging/clinic/module.c.h @@ -435,6 +435,66 @@ _remote_debugging_RemoteUnwinder_get_stats(PyObject *self, PyObject *Py_UNUSED(i return return_value; } +PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_pause_threads__doc__, +"pause_threads($self, /)\n" +"--\n" +"\n" +"Pause all threads in the target process.\n" +"\n" +"This stops all threads in the target process to allow for consistent\n" +"memory reads during sampling. Must be paired with a call to resume_threads().\n" +"\n" +"Returns True if threads were successfully paused, False if they were already paused.\n" +"\n" +"Raises:\n" +" RuntimeError: If there is an error stopping the threads"); + +#define _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF \ + {"pause_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_pause_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_pause_threads__doc__}, + +static PyObject * +_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self); + +static PyObject * +_remote_debugging_RemoteUnwinder_pause_threads(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _remote_debugging_RemoteUnwinder_pause_threads_impl((RemoteUnwinderObject *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; +} + +PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_resume_threads__doc__, +"resume_threads($self, /)\n" +"--\n" +"\n" +"Resume all threads in the target process.\n" +"\n" +"This resumes threads that were previously paused with pause_threads().\n" +"\n" +"Returns True if threads were successfully resumed, False if they were not paused."); + +#define _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF \ + {"resume_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_resume_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_resume_threads__doc__}, + +static PyObject * +_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self); + +static PyObject * +_remote_debugging_RemoteUnwinder_resume_threads(PyObject *self, PyObject *Py_UNUSED(ignored)) +{ + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _remote_debugging_RemoteUnwinder_resume_threads_impl((RemoteUnwinderObject *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; +} + PyDoc_STRVAR(_remote_debugging_BinaryWriter___init____doc__, "BinaryWriter(filename, sample_interval_us, start_time_us, *,\n" " compression=0)\n" @@ -1236,4 +1296,4 @@ _remote_debugging_is_python_process(PyObject *module, PyObject *const *args, Py_ exit: return return_value; } -/*[clinic end generated code: output=036de0b06d0e34cc input=a9049054013a1b77]*/ +/*[clinic end generated code: output=34f50b18f317b9b6 input=a9049054013a1b77]*/ diff --git a/Modules/_remote_debugging/frame_cache.c b/Modules/_remote_debugging/frame_cache.c index e94f4d3d81caea..b6566d7cff7b54 100644 --- a/Modules/_remote_debugging/frame_cache.c +++ b/Modules/_remote_debugging/frame_cache.c @@ -174,8 +174,16 @@ frame_cache_lookup_and_extend( Py_ssize_t num_frames = PyList_GET_SIZE(entry->frame_list); - // Extend frame_info with frames from start_idx onwards - PyObject *slice = PyList_GetSlice(entry->frame_list, start_idx, num_frames); + // Extend frame_info with frames ABOVE start_idx (not including it). + // The frame at start_idx (last_profiled_frame) was the executing frame + // in the previous sample and its line number may have changed. + // Only frames above it (its callers) are frozen at their call sites. + Py_ssize_t cache_start = start_idx + 1; + if (cache_start >= num_frames) { + return 0; // Nothing above last_profiled_frame to extend with + } + + PyObject *slice = PyList_GetSlice(entry->frame_list, cache_start, num_frames); if (!slice) { return -1; } @@ -188,9 +196,9 @@ frame_cache_lookup_and_extend( return -1; } - // Also extend frame_addrs with cached addresses if provided + // Also extend frame_addrs with cached addresses (above last_profiled_frame) if (frame_addrs) { - for (Py_ssize_t i = start_idx; i < entry->num_addrs && *num_addrs < max_addrs; i++) { + for (Py_ssize_t i = cache_start; i < entry->num_addrs && *num_addrs < max_addrs; i++) { frame_addrs[(*num_addrs)++] = entry->addrs[i]; } } diff --git a/Modules/_remote_debugging/frames.c b/Modules/_remote_debugging/frames.c index 8aebd40671ccbf..d9fece6459875a 100644 --- a/Modules/_remote_debugging/frames.c +++ b/Modules/_remote_debugging/frames.c @@ -281,16 +281,7 @@ process_frame_chain( ctx->stopped_at_cached_frame = 0; ctx->last_frame_visited = 0; - if (ctx->last_profiled_frame != 0 && ctx->frame_addr == ctx->last_profiled_frame) { - ctx->stopped_at_cached_frame = 1; - return 0; - } - while ((void*)frame_addr != NULL) { - if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) { - ctx->stopped_at_cached_frame = 1; - break; - } PyObject *frame = NULL; uintptr_t next_frame_addr = 0; uintptr_t stackpointer = 0; @@ -311,6 +302,14 @@ process_frame_chain( return -1; } } + + // Skip first frame if requested (used for cache miss continuation) + if (ctx->skip_first_frame && frame_count == 1) { + Py_XDECREF(frame); + frame_addr = next_frame_addr; + continue; + } + if (frame == NULL && PyList_GET_SIZE(ctx->frame_info) == 0) { const char *e = "Failed to parse initial frame in chain"; PyErr_SetString(PyExc_RuntimeError, e); @@ -367,6 +366,11 @@ process_frame_chain( Py_DECREF(frame); } + if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) { + ctx->stopped_at_cached_frame = 1; + break; + } + prev_frame_addr = next_frame_addr; frame_addr = next_frame_addr; } @@ -548,14 +552,15 @@ collect_frames_with_cache( } if (cache_result == 0) { STATS_INC(unwinder, frame_cache_misses); - Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info); + // Continue walking from last_profiled_frame, skipping it (already processed) + Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info); FrameWalkContext continue_ctx = { .frame_addr = ctx->last_profiled_frame, .base_frame_addr = ctx->base_frame_addr, .gc_frame = ctx->gc_frame, - .last_profiled_frame = 0, .chunks = ctx->chunks, + .skip_first_frame = 1, .frame_info = ctx->frame_info, .frame_addrs = ctx->frame_addrs, .num_addrs = ctx->num_addrs, @@ -566,7 +571,6 @@ collect_frames_with_cache( } ctx->num_addrs = continue_ctx.num_addrs; ctx->last_frame_visited = continue_ctx.last_frame_visited; - STATS_ADD(unwinder, frames_read_from_memory, PyList_GET_SIZE(ctx->frame_info) - frames_before_walk); } else { // Partial cache hit - cached stack was validated as complete when stored, diff --git a/Modules/_remote_debugging/module.c b/Modules/_remote_debugging/module.c index ea53dbf5996ab2..e14a23357fb400 100644 --- a/Modules/_remote_debugging/module.c +++ b/Modules/_remote_debugging/module.c @@ -342,6 +342,9 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self, self->skip_non_matching_threads = skip_non_matching_threads; self->cached_state = NULL; self->frame_cache = NULL; +#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING + self->threads_stopped = 0; +#endif // Initialize stats to zero memset(&self->stats, 0, sizeof(self->stats)); if (_Py_RemoteDebug_InitProcHandle(&self->handle, pid) < 0) { @@ -423,6 +426,10 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self, self->win_process_buffer = NULL; self->win_process_buffer_size = 0; #endif +#ifdef __linux__ + self->thread_tids = NULL; + self->thread_tids_capacity = 0; +#endif if (cache_frames && frame_cache_init(self) < 0) { return -1; @@ -930,11 +937,81 @@ _remote_debugging_RemoteUnwinder_get_stats_impl(RemoteUnwinderObject *self) return result; } +/*[clinic input] +@critical_section +_remote_debugging.RemoteUnwinder.pause_threads + +Pause all threads in the target process. + +This stops all threads in the target process to allow for consistent +memory reads during sampling. Must be paired with a call to resume_threads(). + +Returns True if threads were successfully paused, False if they were already paused. + +Raises: + RuntimeError: If there is an error stopping the threads +[clinic start generated code]*/ + +static PyObject * +_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self) +/*[clinic end generated code: output=aaf2bdc0a725750c input=78601c60dbc245fe]*/ +{ +#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING + if (self->threads_stopped) { + Py_RETURN_FALSE; + } + + _Py_RemoteDebug_InitThreadsState(self, &self->threads_state); + if (_Py_RemoteDebug_StopAllThreads(self, &self->threads_state) < 0) { + return NULL; + } + + self->threads_stopped = 1; + Py_RETURN_TRUE; +#else + PyErr_SetString(PyExc_NotImplementedError, + "pause_threads is not supported on this platform"); + return NULL; +#endif +} + +/*[clinic input] +@critical_section +_remote_debugging.RemoteUnwinder.resume_threads + +Resume all threads in the target process. + +This resumes threads that were previously paused with pause_threads(). + +Returns True if threads were successfully resumed, False if they were not paused. +[clinic start generated code]*/ + +static PyObject * +_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self) +/*[clinic end generated code: output=8d6781ea37095536 input=67ca813bd804289e]*/ +{ +#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING + if (!self->threads_stopped) { + Py_RETURN_FALSE; + } + + _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state); + self->threads_stopped = 0; + Py_RETURN_TRUE; +#else + PyErr_SetString(PyExc_NotImplementedError, + "resume_threads is not supported on this platform"); + return NULL; +#endif +} + static PyMethodDef RemoteUnwinder_methods[] = { _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STACK_TRACE_METHODDEF _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ALL_AWAITED_BY_METHODDEF _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ASYNC_STACK_TRACE_METHODDEF _REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STATS_METHODDEF + _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF + _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF {NULL, NULL} }; @@ -943,6 +1020,20 @@ RemoteUnwinder_dealloc(PyObject *op) { RemoteUnwinderObject *self = RemoteUnwinder_CAST(op); PyTypeObject *tp = Py_TYPE(self); + +#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING + if (self->threads_stopped) { + _Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state); + self->threads_stopped = 0; + } +#endif +#ifdef __linux__ + if (self->thread_tids != NULL) { + PyMem_RawFree(self->thread_tids); + self->thread_tids = NULL; + } +#endif + if (self->code_object_cache) { _Py_hashtable_destroy(self->code_object_cache); } diff --git a/Modules/_remote_debugging/threads.c b/Modules/_remote_debugging/threads.c index 3a5b8adb3f4dda..d40031abbdc0c7 100644 --- a/Modules/_remote_debugging/threads.c +++ b/Modules/_remote_debugging/threads.c @@ -11,6 +11,12 @@ #include #endif +#ifdef __linux__ +#include +#include +#include +#endif + /* ============================================================================ * THREAD ITERATION FUNCTIONS * ============================================================================ */ @@ -501,3 +507,286 @@ unwind_stack_for_thread( cleanup_stack_chunks(&chunks); return NULL; } + +/* ============================================================================ + * PROCESS STOP FUNCTIONS + * ============================================================================ */ + +#if defined(__APPLE__) && TARGET_OS_OSX + +void +_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + st->task = MACH_PORT_NULL; + st->suspended = 0; +} + +int +_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + kern_return_t kr = task_suspend(unwinder->handle.task); + if (kr != KERN_SUCCESS) { + if (kr == MACH_SEND_INVALID_DEST) { + PyErr_Format(PyExc_ProcessLookupError, + "Process %d has terminated", unwinder->handle.pid); + } else { + PyErr_Format(PyExc_RuntimeError, + "task_suspend failed for PID %d: kern_return_t %d", + unwinder->handle.pid, kr); + } + return -1; + } + + st->task = unwinder->handle.task; + st->suspended = 1; + _Py_RemoteDebug_ClearCache(&unwinder->handle); + return 0; +} + +void +_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + if (!st->suspended || st->task == MACH_PORT_NULL) { + return; + } + task_resume(st->task); + st->task = MACH_PORT_NULL; + st->suspended = 0; +} + +#elif defined(__linux__) + +void +_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + st->tids = NULL; + st->count = 0; +} + +static int +read_thread_ids(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + char task_path[64]; + snprintf(task_path, sizeof(task_path), "/proc/%d/task", unwinder->handle.pid); + + DIR *dir = opendir(task_path); + if (dir == NULL) { + st->tids = NULL; + st->count = 0; + if (errno == ENOENT || errno == ESRCH) { + PyErr_Format(PyExc_ProcessLookupError, + "Process %d has terminated", unwinder->handle.pid); + } else { + PyErr_SetFromErrnoWithFilename(PyExc_OSError, task_path); + } + return -1; + } + + st->count = 0; + + struct dirent *entry; + while ((entry = readdir(dir)) != NULL) { + if (entry->d_name[0] < '1' || entry->d_name[0] > '9') { + continue; + } + char *endptr; + long tid = strtol(entry->d_name, &endptr, 10); + if (*endptr != '\0' || tid <= 0) { + continue; + } + if (st->count >= unwinder->thread_tids_capacity) { + size_t new_cap = unwinder->thread_tids_capacity == 0 ? 64 : unwinder->thread_tids_capacity * 2; + pid_t *new_tids = PyMem_RawRealloc(unwinder->thread_tids, new_cap * sizeof(pid_t)); + if (new_tids == NULL) { + closedir(dir); + st->tids = NULL; + st->count = 0; + PyErr_NoMemory(); + return -1; + } + unwinder->thread_tids = new_tids; + unwinder->thread_tids_capacity = new_cap; + } + unwinder->thread_tids[st->count++] = (pid_t)tid; + } + + st->tids = unwinder->thread_tids; + closedir(dir); + return 0; +} + +static inline void +detach_threads(_Py_RemoteDebug_ThreadsState *st, size_t up_to) +{ + for (size_t j = 0; j < up_to; j++) { + ptrace(PTRACE_DETACH, st->tids[j], NULL, NULL); + } +} + +static int +seize_thread(pid_t tid) +{ + if (ptrace(PTRACE_SEIZE, tid, NULL, 0) == 0) { + return 0; + } + if (errno == ESRCH) { + return 1; // Thread gone, skip + } + if (errno == EINVAL || errno == EIO) { + // Fallback for older kernels + if (ptrace(PTRACE_ATTACH, tid, NULL, NULL) == 0) { + int status; + waitpid(tid, &status, __WALL); + return 0; + } + if (errno == ESRCH) { + return 1; // Thread gone + } + } + return -1; // Real error +} + +int +_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + if (read_thread_ids(unwinder, st) < 0) { + return -1; + } + + for (size_t i = 0; i < st->count; i++) { + pid_t tid = st->tids[i]; + + int ret = seize_thread(tid); + if (ret == 1) { + continue; // Thread gone, skip + } + if (ret < 0) { + detach_threads(st, i); + PyErr_Format(PyExc_RuntimeError, "Failed to seize thread %d: %s", tid, strerror(errno)); + st->tids = NULL; + st->count = 0; + return -1; + } + + if (ptrace(PTRACE_INTERRUPT, tid, NULL, NULL) == -1 && errno != ESRCH) { + detach_threads(st, i + 1); + PyErr_Format(PyExc_RuntimeError, "Failed to interrupt thread %d: %s", tid, strerror(errno)); + st->tids = NULL; + st->count = 0; + return -1; + } + + int status; + if (waitpid(tid, &status, __WALL) == -1 && errno != ECHILD && errno != ESRCH) { + detach_threads(st, i + 1); + PyErr_Format(PyExc_RuntimeError, "waitpid failed for thread %d: %s", tid, strerror(errno)); + st->tids = NULL; + st->count = 0; + return -1; + } + } + + return 0; +} + +void +_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + if (st->tids == NULL || st->count == 0) { + return; + } + detach_threads(st, st->count); + st->tids = NULL; + st->count = 0; +} + +#elif defined(MS_WINDOWS) + +void +_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + st->hProcess = NULL; + st->suspended = 0; +} + +int +_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + static NtSuspendProcessFunc pNtSuspendProcess = NULL; + static int tried_load = 0; + + if (!tried_load) { + HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll"); + if (hNtdll) { + pNtSuspendProcess = (NtSuspendProcessFunc)GetProcAddress(hNtdll, "NtSuspendProcess"); + } + tried_load = 1; + } + + if (pNtSuspendProcess == NULL) { + PyErr_SetString(PyExc_RuntimeError, "NtSuspendProcess not available"); + return -1; + } + + NTSTATUS status = pNtSuspendProcess(unwinder->handle.hProcess); + if (status >= 0) { + st->hProcess = unwinder->handle.hProcess; + st->suspended = 1; + _Py_RemoteDebug_ClearCache(&unwinder->handle); + return 0; + } + + PyErr_Format(PyExc_RuntimeError, "NtSuspendProcess failed: 0x%lx", status); + return -1; +} + +void +_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + if (!st->suspended || st->hProcess == NULL) { + return; + } + + static NtResumeProcessFunc pNtResumeProcess = NULL; + static int tried_load = 0; + + if (!tried_load) { + HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll"); + if (hNtdll) { + pNtResumeProcess = (NtResumeProcessFunc)GetProcAddress(hNtdll, "NtResumeProcess"); + } + tried_load = 1; + } + + if (pNtResumeProcess != NULL) { + pNtResumeProcess(st->hProcess); + } + st->hProcess = NULL; + st->suspended = 0; +} + +#else + +void +_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + (void)unwinder; + (void)st; +} + +int +_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + (void)unwinder; + (void)st; + return 0; +} + +void +_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st) +{ + (void)unwinder; + (void)st; +} + +#endif diff --git a/Python/remote_debug.h b/Python/remote_debug.h index d3932a3fd1e4d6..d9c5c480fe9a86 100644 --- a/Python/remote_debug.h +++ b/Python/remote_debug.h @@ -33,6 +33,9 @@ extern "C" { #ifdef __linux__ # include # include +# include +# include +# include # if INTPTR_MAX == INT64_MAX # define Elf_Ehdr Elf64_Ehdr # define Elf_Shdr Elf64_Shdr @@ -43,6 +46,17 @@ extern "C" { # define Elf_Phdr Elf32_Phdr # endif # include + +// PTRACE options - define if not available +# ifndef PTRACE_SEIZE +# define PTRACE_SEIZE 0x4206 +# endif +# ifndef PTRACE_INTERRUPT +# define PTRACE_INTERRUPT 0x4207 +# endif +# ifndef PTRACE_EVENT_STOP +# define PTRACE_EVENT_STOP 128 +# endif #endif #if defined(__APPLE__) && defined(TARGET_OS_OSX) && TARGET_OS_OSX @@ -55,6 +69,7 @@ extern "C" { # include # include # include +# include # include # include # include @@ -169,7 +184,7 @@ _Py_RemoteDebug_InitProcHandle(proc_handle_t *handle, pid_t pid) { } #elif defined(MS_WINDOWS) handle->hProcess = OpenProcess( - PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION, + PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION | PROCESS_SUSPEND_RESUME, FALSE, pid); if (handle->hProcess == NULL) { PyErr_SetFromWindowsErr(0); diff --git a/Tools/inspection/benchmark_external_inspection.py b/Tools/inspection/benchmark_external_inspection.py index 9c40c2f4492e58..fee3435496da0b 100644 --- a/Tools/inspection/benchmark_external_inspection.py +++ b/Tools/inspection/benchmark_external_inspection.py @@ -167,7 +167,7 @@ def create_threads(n): } -def benchmark(unwinder, duration_seconds=10): +def benchmark(unwinder, duration_seconds=10, blocking=False): """Benchmark mode - measure raw sampling speed for specified duration""" sample_count = 0 fail_count = 0 @@ -187,9 +187,15 @@ def benchmark(unwinder, duration_seconds=10): total_attempts += 1 work_start = time.perf_counter() try: - stack_trace = unwinder.get_stack_trace() - if stack_trace: - sample_count += 1 + if blocking: + unwinder.pause_threads() + try: + stack_trace = unwinder.get_stack_trace() + if stack_trace: + sample_count += 1 + finally: + if blocking: + unwinder.resume_threads() except (OSError, RuntimeError, UnicodeDecodeError) as e: fail_count += 1 @@ -353,6 +359,12 @@ def parse_arguments(): help="Which threads to include in the benchmark (default: all)", ) + parser.add_argument( + "--blocking", + action="store_true", + help="Stop all threads before sampling for consistent snapshots", + ) + return parser.parse_args() @@ -408,6 +420,9 @@ def main(): print( f"{colors.CYAN}Benchmark Duration:{colors.RESET} {colors.YELLOW}{args.duration}{colors.RESET} seconds" ) + print( + f"{colors.CYAN}Blocking Mode:{colors.RESET} {colors.GREEN if args.blocking else colors.YELLOW}{'enabled' if args.blocking else 'disabled'}{colors.RESET}" + ) process = None temp_file_path = None @@ -436,7 +451,7 @@ def main(): unwinder = _remote_debugging.RemoteUnwinder( process.pid, cache_frames=True, **kwargs ) - results = benchmark(unwinder, duration_seconds=args.duration) + results = benchmark(unwinder, duration_seconds=args.duration, blocking=args.blocking) finally: cleanup_process(process, temp_file_path)