Skip to content

Commit 14ebe21

Browse files
P403n1x87avara1986mabdinurVianneyRuhlmannbrettlangdon
authored
refactor: native fork-safe threads (#14163)
We refactor the native periodic thread implementation to be fork-safe whereby all such threads that are running at the time of a fork are automatically stopped before the fork, then restarted after it. We also take care to avoid stopping and restarting threads in the parent process if we detect an immediate call to fork again. Some of the implications of this change are that there is no longer the need for fork-safe synchronisation objects, such as Locks and Events. This is because all (ddtrace) threads are guaranteed to be stopped before a fork, and restarted afterwards. There is also no longer the need to manually recreate threads, as currently done in many places. The only thing that needs to be taken care of is to ensure that the state of the periodic services is as expected after a fork. For example, many products have the need to avoid sending duplicate values from different processes. This can be achieved by either subclassing from `ForksafeAwakeablePeriodicService` and implementing the `reset` method to let any periodic service to automatically trigger the reset logic on forks (the preferred way), or by the current approach of registering a fork-safe hook. ## Performance Analysis Stopping and restarting threads can be expensive but gives good fork-safety guarantees. Because threads have to be joined before the fork can continue, a forked process might get delayed by a thread currently busy on I/O. Because our periodic threads run on periods that are O(1s), we can expect these unfortunate event to be quite rare. Besides, it is quite likely that a process forks at the very beginning of its execution, where it is unlikely that the periodic threads have had a chance to trigger their first periodic action. The timer set around close `fork` invocation should reduce delays even more by preventing the parent process from stopping and restarting threads in between `fork` calls. To give some number, the invocation of 1000 forks in a tight loop with the tracer and the profiler enabled require ~1.2s to complete on an M1. In comparison, the same loop without `ddtrace` takes 0.3s. The same loop with stop/restart in between forks takes ~1.5s. The wall-time view through a profiler gives the following picture: <img width="1490" height="350" alt="Screenshot 2025-07-29 at 11 27 49" src="https://github.com/user-attachments/assets/8e336a9e-96b0-4c96-be62-f76cac8cd8e0" /> A good portion of the overhead comes from the tracing of `fork` itself, which could be improved by moving it to the native layer in follow-up work. ## Checklist - [ ] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Alberto Vara <alberto.vara@datadoghq.com> Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com> Co-authored-by: vianney <vianney.ruhlmann@datadoghq.com> Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com> Co-authored-by: Gyuheon Oh <gyuheon.oh@datadoghq.com> Co-authored-by: Gyuheon Oh <102937919+gyuheon0h@users.noreply.github.com> Co-authored-by: Thomas Kowalski <thomas.kowalski@datadoghq.com> Co-authored-by: Juanjo Alvarez Martinez <juanjo.alvarezmartinez@datadoghq.com>
1 parent e95d75c commit 14ebe21

38 files changed

Lines changed: 479 additions & 383 deletions

File tree

ddtrace/_trace/context.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import base64
22
import re
3-
import threading
43
from typing import Any
54
from typing import Optional
65
from typing import Text
@@ -14,6 +13,7 @@
1413
from ddtrace.internal.constants import W3C_TRACEPARENT_KEY
1514
from ddtrace.internal.constants import W3C_TRACESTATE_KEY
1615
from ddtrace.internal.logger import get_logger
16+
from ddtrace.internal.threads import RLock
1717
from ddtrace.internal.utils.http import w3c_get_dd_list_member as _w3c_get_dd_list_member
1818

1919

@@ -60,7 +60,7 @@ def __init__(
6060
sampling_priority: Optional[float] = None,
6161
meta: Optional[dict[str, str]] = None,
6262
metrics: Optional[dict[str, NumericType]] = None,
63-
lock: Optional[threading.RLock] = None,
63+
lock: Optional[RLock] = None,
6464
span_links: Optional[list[SpanLink]] = None,
6565
baggage: Optional[dict[str, Any]] = None,
6666
is_remote: bool = True,
@@ -89,7 +89,7 @@ def __init__(
8989
# DEV: A `forksafe.RLock` is not necessary here since Contexts
9090
# are recreated by the tracer after fork
9191
# https://github.com/DataDog/dd-trace-py/blob/a1932e8ddb704d259ea8a3188d30bf542f59fd8d/ddtrace/tracer.py#L489-L508
92-
self._lock = threading.RLock()
92+
self._lock = RLock()
9393

9494
def __getstate__(self) -> _ContextState:
9595
return (
@@ -116,7 +116,7 @@ def __setstate__(self, state: _ContextState) -> None:
116116
self._reactivate,
117117
) = state
118118
# We cannot serialize and lock, so we must recreate it unless we already have one
119-
self._lock = threading.RLock()
119+
self._lock = RLock()
120120

121121
def __enter__(self) -> "Context":
122122
self._lock.acquire()

ddtrace/contrib/internal/ray/span_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from ddtrace.constants import ERROR_MSG
1717
from ddtrace.constants import SPAN_KIND
1818
from ddtrace.ext import SpanKind
19-
from ddtrace.internal.forksafe import Lock
2019
from ddtrace.internal.logger import get_logger
20+
from ddtrace.internal.threads import Lock
2121

2222
from .constants import DD_PARTIAL_VERSION
2323
from .constants import DD_WAS_LONG_RUNNING

ddtrace/contrib/internal/subprocess/patch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
from ddtrace.contrib.internal.subprocess.constants import COMMANDS
1616
from ddtrace.ext import SpanTypes
1717
from ddtrace.internal import core
18-
from ddtrace.internal.forksafe import RLock
1918
from ddtrace.internal.logger import get_logger
2019
from ddtrace.internal.settings._config import config
2120
from ddtrace.internal.settings.asm import config as asm_config
21+
from ddtrace.internal.threads import RLock
2222
from ddtrace.trace import tracer
2323

2424

ddtrace/debugging/_encoding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
from ddtrace.debugging._config import di_config
1717
from ddtrace.debugging._signal.log import LogSignal
1818
from ddtrace.debugging._signal.snapshot import Snapshot
19-
from ddtrace.internal import forksafe
2019
from ddtrace.internal import process_tags
2120
from ddtrace.internal._encoding import BufferFull
2221
from ddtrace.internal.logger import get_logger
22+
from ddtrace.internal.threads import RLock
2323
from ddtrace.internal.utils.formats import format_trace_id
2424

2525

@@ -322,7 +322,7 @@ def __init__(
322322
) -> None:
323323
self._encoder = encoder
324324
self._buffer = JsonBuffer(buffer_size)
325-
self._lock = forksafe.RLock()
325+
self._lock = RLock()
326326
self._on_full = on_full
327327
self.count = 0
328328
self.max_size = buffer_size - self._buffer.size

ddtrace/debugging/_origin/span.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
from ddtrace.debugging._signal.snapshot import Snapshot
1818
from ddtrace.debugging._uploader import SignalUploader
1919
from ddtrace.debugging._uploader import UploaderProduct
20-
from ddtrace.internal.forksafe import Lock
2120
from ddtrace.internal.logger import get_logger
2221
from ddtrace.internal.safety import _isinstance
22+
from ddtrace.internal.threads import Lock
2323
from ddtrace.internal.wrapping.context import LazyWrappingContext
2424

2525

ddtrace/debugging/_probe/registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from ddtrace.debugging._probe.model import Probe
77
from ddtrace.debugging._probe.model import ProbeLocationMixin
88
from ddtrace.debugging._probe.status import ProbeStatusLogger
9-
from ddtrace.internal import forksafe
109
from ddtrace.internal.logger import get_logger
10+
from ddtrace.internal.threads import RLock
1111

1212

1313
logger = get_logger(__name__)
@@ -66,7 +66,7 @@ def __init__(self, status_logger: ProbeStatusLogger, *args: Any, **kwargs: Any)
6666
# Used to keep track of probes pending installation
6767
self._pending: dict[str, list[Probe]] = defaultdict(list)
6868

69-
self._lock = forksafe.RLock()
69+
self._lock = RLock()
7070

7171
def register(self, *probes: Probe) -> None:
7272
"""Register a probe."""

ddtrace/internal/_threads.cpp

Lines changed: 57 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ class Event
244244
return _cond.wait_for(lock, timeout, [this]() { return _set; });
245245
}
246246

247+
bool wait(std::chrono::time_point<std::chrono::steady_clock> until)
248+
{
249+
std::unique_lock<std::mutex> lock(_mutex);
250+
return _cond.wait_until(lock, until, [this]() { return _set; });
251+
}
252+
247253
void clear()
248254
{
249255
std::lock_guard<std::mutex> lock(_mutex);
@@ -273,7 +279,9 @@ typedef struct periodic_thread
273279

274280
bool _stopping;
275281
bool _atexit;
276-
bool _after_fork;
282+
bool _skip_shutdown;
283+
284+
std::chrono::time_point<std::chrono::steady_clock> _next_call_time;
277285

278286
std::unique_ptr<Event> _started;
279287
std::unique_ptr<Event> _stopped;
@@ -304,8 +312,6 @@ static PyMemberDef PeriodicThread_members[] = {
304312
0,
305313
"whether to ignore the thread for profiling" },
306314

307-
{ "_is_after_fork", T_BOOL, offsetof(PeriodicThread, _after_fork), READONLY, "whether the thread is after fork" },
308-
309315
{ NULL } /* Sentinel */
310316
};
311317

@@ -341,7 +347,7 @@ PeriodicThread_init(PeriodicThread* self, PyObject* args, PyObject* kwargs)
341347

342348
self->_stopping = false;
343349
self->_atexit = false;
344-
self->_after_fork = false;
350+
self->_skip_shutdown = false;
345351

346352
self->_started = std::make_unique<Event>();
347353
self->_stopped = std::make_unique<Event>();
@@ -383,16 +389,8 @@ PeriodicThread__on_shutdown(PeriodicThread* self)
383389

384390
// ----------------------------------------------------------------------------
385391
static PyObject*
386-
PeriodicThread_start(PeriodicThread* self, PyObject* args)
392+
PeriodicThread_start(PeriodicThread* self, PyObject* Py_UNUSED(args))
387393
{
388-
// After fork, the child process should not restart threads that were running in the parent
389-
// until properly reinitialized through forksafe handlers. This prevents crashes when
390-
// pthread_create is called before threading state is safe (e.g., in uvloop's _after_fork).
391-
// Check this first because after fork, self->_thread is non-null but the thread doesn't exist.
392-
if (self->_after_fork) {
393-
Py_RETURN_NONE;
394-
}
395-
396394
if (self->_thread != nullptr) {
397395
PyErr_SetString(PyExc_RuntimeError, "Thread already started");
398396
return NULL;
@@ -401,6 +399,11 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
401399
if (self->_stopping)
402400
Py_RETURN_NONE;
403401

402+
// Initialize the next call time to the current time plus the interval.
403+
// This ensures that the first call happens after the specified interval.
404+
self->_next_call_time =
405+
std::chrono::steady_clock::now() + std::chrono::milliseconds((long long)(self->interval * 1000));
406+
404407
// Start the thread
405408
self->_thread = std::make_unique<std::thread>([self]() {
406409
GILGuard _gil;
@@ -423,15 +426,14 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
423426
self->_started->set();
424427

425428
bool error = false;
426-
auto interval = std::chrono::milliseconds((long long)(self->interval * 1000));
427429
if (self->_no_wait_at_start)
428430
self->_request->set();
429431

430432
while (!self->_stopping) {
431433
{
432434
AllowThreads _;
433435

434-
if (self->_request->wait(interval)) {
436+
if (self->_request->wait(self->_next_call_time)) {
435437
if (self->_stopping)
436438
break;
437439

@@ -454,6 +456,9 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
454456
break;
455457
}
456458

459+
self->_next_call_time =
460+
std::chrono::steady_clock::now() + std::chrono::milliseconds((long long)(self->interval * 1000));
461+
457462
// If this came from a request mark it as served
458463
self->_served->set();
459464
}
@@ -464,7 +469,7 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
464469

465470
// Run the shutdown callback if there was no error and we are not
466471
// at Python shutdown.
467-
if (!self->_atexit && !error && self->_on_shutdown != Py_None) {
472+
if (!self->_atexit && !error && self->_on_shutdown != Py_None && !self->_skip_shutdown) {
468473
#if PY_VERSION_HEX >= 0x30d0000
469474
if (!Py_IsFinalizing()) {
470475
#else
@@ -474,6 +479,8 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
474479
}
475480
}
476481

482+
PyDict_DelItem(_periodic_threads, self->ident);
483+
477484
// Notify the join method that the thread has stopped
478485
self->_stopped->set();
479486
});
@@ -493,14 +500,14 @@ PeriodicThread_start(PeriodicThread* self, PyObject* args)
493500

494501
// ----------------------------------------------------------------------------
495502
static PyObject*
496-
PeriodicThread_awake(PeriodicThread* self, PyObject* args)
503+
PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args))
497504
{
498505
if (self->_thread == nullptr) {
499506
PyErr_SetString(PyExc_RuntimeError, "Thread not started");
500507
return NULL;
501508
}
502509

503-
if (!self->_after_fork) {
510+
{
504511
AllowThreads _;
505512
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
506513

@@ -514,19 +521,15 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* args)
514521

515522
// ----------------------------------------------------------------------------
516523
static PyObject*
517-
PeriodicThread_stop(PeriodicThread* self, PyObject* args)
524+
PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args))
518525
{
519526
if (self->_thread == nullptr) {
520527
PyErr_SetString(PyExc_RuntimeError, "Thread not started");
521528
return NULL;
522529
}
523530

524-
if (!self->_after_fork) {
525-
// The thread is no longer running so it makes no sense to stop it.
526-
// Attempting to acquire the Event lock could deadlock.
527-
self->_stopping = true;
528-
self->_request->set();
529-
}
531+
self->_stopping = true;
532+
self->_request->set();
530533

531534
Py_RETURN_NONE;
532535
}
@@ -545,11 +548,6 @@ PeriodicThread_join(PeriodicThread* self, PyObject* args, PyObject* kwargs)
545548
return NULL;
546549
}
547550

548-
if (self->_after_fork) {
549-
// The thread is no longer running so it makes no sense to join it.
550-
Py_RETURN_NONE;
551-
}
552-
553551
PyObject* timeout = Py_None;
554552

555553
if (args != NULL && kwargs != NULL) {
@@ -586,7 +584,7 @@ PeriodicThread_join(PeriodicThread* self, PyObject* args, PyObject* kwargs)
586584

587585
// ----------------------------------------------------------------------------
588586
static PyObject*
589-
PeriodicThread__atexit(PeriodicThread* self, PyObject* args)
587+
PeriodicThread__atexit(PeriodicThread* self, PyObject* Py_UNUSED(args))
590588
{
591589
self->_atexit = true;
592590

@@ -601,9 +599,32 @@ PeriodicThread__atexit(PeriodicThread* self, PyObject* args)
601599

602600
// ----------------------------------------------------------------------------
603601
static PyObject*
604-
PeriodicThread__after_fork(PeriodicThread* self, PyObject* args)
602+
PeriodicThread__after_fork(PeriodicThread* self, PyObject* Py_UNUSED(args))
605603
{
606-
self->_after_fork = true;
604+
self->_thread = nullptr;
605+
606+
self->_stopping = false;
607+
self->_atexit = false;
608+
self->_skip_shutdown = false;
609+
610+
// We don't clear the request event because we might have pending awake
611+
// requests.
612+
self->_started->clear();
613+
self->_stopped->clear();
614+
self->_served->clear();
615+
616+
PeriodicThread_start(self, NULL);
617+
618+
Py_RETURN_NONE;
619+
}
620+
621+
// ----------------------------------------------------------------------------
622+
static PyObject*
623+
PeriodicThread__before_fork(PeriodicThread* self, PyObject* Py_UNUSED(args))
624+
{
625+
self->_skip_shutdown = true;
626+
627+
PeriodicThread_stop(self, NULL);
607628

608629
Py_RETURN_NONE;
609630
}
@@ -661,8 +682,9 @@ static PyMethodDef PeriodicThread_methods[] = {
661682
{ "join", (PyCFunction)PeriodicThread_join, METH_VARARGS | METH_KEYWORDS, "Join the thread" },
662683
/* Private */
663684
{ "_atexit", (PyCFunction)PeriodicThread__atexit, METH_NOARGS, "Stop the thread at exit" },
664-
{ "_after_fork", (PyCFunction)PeriodicThread__after_fork, METH_NOARGS, "Mark the thread as after fork" },
665-
{ NULL } /* Sentinel */
685+
{ "_after_fork", (PyCFunction)PeriodicThread__after_fork, METH_NOARGS, "Refresh the thread after fork" },
686+
{ "_before_fork", (PyCFunction)PeriodicThread__before_fork, METH_NOARGS, "Prepare the thread for fork" },
687+
{ NULL, NULL, 0, NULL } /* Sentinel */
666688
};
667689

668690
// ----------------------------------------------------------------------------

ddtrace/internal/_threads.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import typing as t
22

33
class PeriodicThread:
4-
_is_after_fork: bool
54
name: str
65
ident: int
76
interval: float
@@ -20,5 +19,6 @@ class PeriodicThread:
2019
def awake(self) -> None: ...
2120
def _atexit(self) -> None: ...
2221
def _after_fork(self) -> None: ...
22+
def _before_fork(self) -> None: ...
2323

2424
periodic_threads: dict[int, PeriodicThread]

ddtrace/internal/datadog/profiling/stack/include/sampler.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ class Sampler
8080

8181
// Delegates to the StackRenderer to clear its caches after fork
8282
void postfork_child();
83+
84+
// Restart the sampler after fork if it was running
85+
void restart_after_fork();
8386
};
8487

8588
} // namespace Datadog

0 commit comments

Comments
 (0)