Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions libdd-crashtracker/src/collector/counters.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::sync::atomic::{AtomicI64, Ordering::SeqCst};
use std::sync::atomic::{AtomicI64, Ordering};
use thiserror::Error;

#[cfg(unix)]
Expand Down Expand Up @@ -48,7 +48,6 @@ impl OpTypes {
#[allow(clippy::declare_interior_mutable_const)]
const ATOMIC_ZERO: AtomicI64 = AtomicI64::new(0);

// TODO: Is this
static OP_COUNTERS: [AtomicI64; OpTypes::SIZE as usize] = [ATOMIC_ZERO; OpTypes::SIZE as usize];

/// Track that an operation (of type op) has begun.
Expand All @@ -58,9 +57,7 @@ static OP_COUNTERS: [AtomicI64; OpTypes::SIZE as usize] = [ATOMIC_ZERO; OpTypes:
/// ATOMICITY:
/// This function is atomic.
pub fn begin_op(op: OpTypes) -> Result<(), CounterError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is a bug (even before these changes.)

For begin_op

  1. If old == i64::MAX - 1, the new value becomes i64::MAX, which is still not overflow.
  2. If old == i64::MAX, fetch_add(1, ...) wraps and stores i64::MIN before we can report an error.

For end_op
If old == 0, we return an error, but the counter has already been decremented to -1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since we use these as diagnostic, and not critical for synchronization I think its not life-threatening?

Copy link
Copy Markdown
Contributor Author

@yannham yannham Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you're onto something. The first one is a classical problem with counters; IIRC there's specific handling in e.g. the implementation Arc (even if you do it right there could theoretically be concurrent fetch_adds between your fetch_add and the test, which would overflow). One possibility is to keep a "buffer zone": instead of checking for overflow tightly, you could for example test for old > i64::MAX / 2, and maybe reset to old upon overflow (it's almost impossible in practice that there are i64::MAX / 2 concurrent increments before the test). A clean fix requires an initial load and a compare-exchange, I fear, which is more costly for 99.99% of the code paths where you don't actually overflow.

I'm not sure the second is an issue here though, since the atomic is i64 and the check is old <= 0, it's probably ok for the counter to go far in the negative values - it's considered the same as being 0? Except maybe upon reading/reporting? Though, similarly, you could fix it with a more complex read-then-compare-exchange loop.

By the way, what do you think is a reasonable range for those counters in practice?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Arc is basically pulling the MAX / 2 trick as well: https://doc.rust-lang.org/src/alloc/sync.rs.html#2390-2407

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, its a non issue for crashtracking. Each op will ever only be 0 or 1 technically. I was just thinking about the underlying logic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I guess the upper bound is the # of threads doing the same op at the same time. Mostly profiling operations.

Copy link
Copy Markdown
Contributor Author

@yannham yannham Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then indeed overflow is very theoretical, in fact straight up impossible, given each thread takes up some space for its stack, making the max number of live threads at any point in time quite smaller than i64::MAX. But yeah in general I would say that keeping a safe range of (i64::MAX negative values for underflow, and i64::MAX / 2 of upper values for overflow) is practical way to do it without hurting the happy path. The right way ™️ would be to load first, and only update (with a compare-exchange) if it's indeed not overflowing/underflowing the counter, but this is quite more expensive.

// TODO: I'm making everything SeqCst for now. Could possibly gain some
// performance by using a weaker ordering.
let old = OP_COUNTERS[op as usize].fetch_add(1, SeqCst);
let old = OP_COUNTERS[op as usize].fetch_add(1, Ordering::Relaxed);
if old == i64::MAX - 1 {
return Err(CounterError::CounterOverflow(op));
}
Expand All @@ -72,7 +69,7 @@ pub fn begin_op(op: OpTypes) -> Result<(), CounterError> {
/// PRECONDITIONS: This function assumes that the crash-tracker is initialized.
/// ATOMICITY: This function is atomic.
pub fn end_op(op: OpTypes) -> Result<(), CounterError> {
let old = OP_COUNTERS[op as usize].fetch_sub(1, SeqCst);
let old = OP_COUNTERS[op as usize].fetch_sub(1, Ordering::Relaxed);
if old <= 0 {
return Err(CounterError::OperationNotStarted(op));
}
Expand Down Expand Up @@ -103,7 +100,12 @@ pub fn emit_counters(w: &mut impl Write) -> Result<(), CounterError> {

writeln!(w, "{DD_CRASHTRACK_BEGIN_COUNTERS}")?;
for (i, c) in OP_COUNTERS.iter().enumerate() {
writeln!(w, "{{\"{}\": {}}}", OpTypes::name(i)?, c.load(SeqCst))?;
writeln!(
w,
"{{\"{}\": {}}}",
OpTypes::name(i)?,
c.load(Ordering::Relaxed)
)?;
}
writeln!(w, "{DD_CRASHTRACK_END_COUNTERS}")?;
w.flush()?;
Expand All @@ -113,12 +115,12 @@ pub fn emit_counters(w: &mut impl Write) -> Result<(), CounterError> {
/// Resets all counters to 0.
/// Expected to be used after a fork, to reset the counters on the child
/// ATOMICITY:
/// This is NOT ATOMIC.
/// The reset of each individual counter is atomic, but the entire reset is NOT.
/// Should only be used when no conflicting updates can occur,
/// e.g. after a fork but before ops start on the child.
pub fn reset_counters() -> Result<(), CounterError> {
for c in OP_COUNTERS.iter() {
c.store(0, SeqCst);
c.store(0, Ordering::Relaxed);
}
Ok(())
}
Expand Down
Loading