From 2a60ed98577d7868d578b2423af86e33875be4c9 Mon Sep 17 00:00:00 2001 From: Ayan Khan Date: Wed, 22 Apr 2026 14:07:23 -0400 Subject: [PATCH 1/5] fix(telemetry): schedule ExtendedHeartbeat on worker start MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `ExtendedHeartbeat` lifecycle action was present in the scheduler's `delays` catalog (populated at `build_worker`) but was never added to the `deadlines` queue, so its handler was never invoked and `app-extended-heartbeat` payloads were never emitted. `Scheduler::new(delays)` always starts with an empty `deadlines` vec, and `next_deadline()` only ever reads from `deadlines`. Events must be explicitly scheduled via `schedule_event(event)` to actually fire. `Lifecycle(Start)` only scheduled `FlushMetricAggr` and `FlushData`. The `Lifecycle(ExtendedHeartbeat)` handler self-reschedules after its first fire — which meant bootstrapping a chicken-and-egg that never resolved. Fix: schedule `ExtendedHeartbeat` alongside `FlushMetricAggr` and `FlushData` inside `Lifecycle(Start)`. The bug went unnoticed because: - Default `telemetry_extended_heartbeat_interval` is 24h - Prior to #1824 the scheduler used a hardcoded 24h anyway, so it was impossible to shorten the interval in tests - No existing unit / integration test waited long enough (or used a short enough interval) to observe the first extended heartbeat Surfaced by system-tests PR 6338, which adds a `TELEMETRY_EXTENDED_HEARTBEAT` scenario with a 2s interval. All libdatadog-based tracers (PHP, Go, .NET, Java Spring-Boot-3-native, etc.) fail that scenario with `app-extended-heartbeat event not found`. Added unit test `lifecycle_start_schedules_extended_heartbeat` that verifies all three events are scheduled after processing `Lifecycle(Start)`. Fails without the fix, passes with it. Co-Authored-By: Claude Opus 4.7 (1M context) --- libdd-telemetry/src/worker/mod.rs | 50 ++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 67b5f5e742..aa6a2107bb 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -427,6 +427,11 @@ impl TelemetryWorker { self.deadlines .schedule_event(LifecycleAction::FlushData) .unwrap(); + + #[allow(clippy::unwrap_used)] + self.deadlines + .schedule_event(LifecycleAction::ExtendedHeartbeat) + .unwrap(); self.data.started = true; } } @@ -1283,7 +1288,10 @@ mod tests { use crate::worker::http_client::header::{ DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID, }; - use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerHandle}; + use crate::worker::{ + LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, + TelemetryWorkerHandle, + }; use libdd_common::{http_common, Endpoint}; use tokio::runtime::Runtime; @@ -1436,6 +1444,46 @@ mod tests { ); } + /// `Lifecycle(Start)` must schedule `ExtendedHeartbeat` alongside the other periodic + /// events. Without an initial schedule the event sits in the scheduler's `delays` + /// catalog but never enters the `deadlines` queue, so its handler is never invoked + /// and `app-extended-heartbeat` payloads are never emitted. + #[tokio::test] + async fn lifecycle_start_schedules_extended_heartbeat() { + let mut b = TelemetryWorkerBuilder::new( + "h".into(), + "svc".into(), + "lang".into(), + "1".into(), + "tv".into(), + ); + b.config + .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) + .unwrap(); + b.runtime_id = Some("rid".into()); + let mut worker = b.build_worker(Some(tokio::runtime::Handle::current())).1; + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + + let scheduled: Vec = + worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); + + assert!( + scheduled.contains(&LifecycleAction::FlushMetricAggr), + "Start should schedule FlushMetricAggr; scheduled={scheduled:?}", + ); + assert!( + scheduled.contains(&LifecycleAction::FlushData), + "Start should schedule FlushData; scheduled={scheduled:?}", + ); + assert!( + scheduled.contains(&LifecycleAction::ExtendedHeartbeat), + "Start should schedule ExtendedHeartbeat; scheduled={scheduled:?}", + ); + } + mod reset { use super::super::*; use crate::data::{ From a5bd1d986df67d0d4cdd6bff96e9241d34da7fbc Mon Sep 17 00:00:00 2001 From: Ayan Khan Date: Wed, 22 Apr 2026 14:15:04 -0400 Subject: [PATCH 2/5] test(telemetry): generalize Start scheduling test as an invariant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the ExtendedHeartbeat-specific assertion with an invariant test that walks the scheduler's `delays` catalog and asserts every entry is present in `deadlines` after `Lifecycle(Start)`. A specific test would only catch a regression of this exact bug; an invariant test catches the whole class — if a future periodic `LifecycleAction` is added with a delay but nobody schedules it on Start, the test fails with a message naming the forgotten variant. Also add a negative guard for the `MetricsLogs` flavor to lock in its intentional exclusion of lifecycle events like ExtendedHeartbeat, so a future change that starts emitting lifecycle telemetry from the metrics-only worker has to update the test explicitly. Co-Authored-By: Claude Opus 4.7 (1M context) --- libdd-telemetry/src/worker/mod.rs | 63 +++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index aa6a2107bb..598f962f4a 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1290,7 +1290,7 @@ mod tests { }; use crate::worker::{ LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, - TelemetryWorkerHandle, + TelemetryWorkerFlavor, TelemetryWorkerHandle, }; use libdd_common::{http_common, Endpoint}; use tokio::runtime::Runtime; @@ -1444,12 +1444,7 @@ mod tests { ); } - /// `Lifecycle(Start)` must schedule `ExtendedHeartbeat` alongside the other periodic - /// events. Without an initial schedule the event sits in the scheduler's `delays` - /// catalog but never enters the `deadlines` queue, so its handler is never invoked - /// and `app-extended-heartbeat` payloads are never emitted. - #[tokio::test] - async fn lifecycle_start_schedules_extended_heartbeat() { + fn build_test_worker_with_flavor(flavor: TelemetryWorkerFlavor) -> TelemetryWorker { let mut b = TelemetryWorkerBuilder::new( "h".into(), "svc".into(), @@ -1461,26 +1456,70 @@ mod tests { .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) .unwrap(); b.runtime_id = Some("rid".into()); - let mut worker = b.build_worker(Some(tokio::runtime::Handle::current())).1; + b.flavor = flavor; + b.build_worker(Some(tokio::runtime::Handle::current())).1 + } + + /// Invariant for the `Full` flavor: every event that has a delay entry in the + /// scheduler's `delays` catalog must be scheduled by `Lifecycle(Start)`. An event + /// with a delay but no initial schedule will sit in `delays` forever without ever + /// entering the `deadlines` queue, so its handler never fires — the original + /// `app-extended-heartbeat` bug. + /// + /// Keep this as an invariant (walk `delays`) rather than an enumeration of specific + /// variants so that adding a new periodic `LifecycleAction` can't silently regress. + #[tokio::test] + async fn full_flavor_start_schedules_every_periodic_action() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); let _ = worker .dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) .await; + let delays: Vec = + worker.deadlines.delays.iter().map(|(_, k)| *k).collect(); + let scheduled: Vec = + worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); + + assert!( + !delays.is_empty(), + "test precondition: scheduler should register at least one periodic action", + ); + for ev in &delays { + assert!( + scheduled.contains(ev), + "event {ev:?} has a delay catalog entry but was not scheduled on Start; \ + scheduled={scheduled:?}", + ); + } + } + + /// The `MetricsLogs` flavor intentionally excludes lifecycle events (app-started, + /// heartbeats, extended heartbeats). This is a negative regression guard: if a + /// future change starts emitting lifecycle telemetry from the metrics-only flavor, + /// this test will flag it so the decision is explicit. + #[tokio::test] + async fn metrics_logs_flavor_start_does_not_schedule_extended_heartbeat() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::MetricsLogs); + + let _ = worker + .dispatch_metrics_logs_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + let scheduled: Vec = worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); assert!( scheduled.contains(&LifecycleAction::FlushMetricAggr), - "Start should schedule FlushMetricAggr; scheduled={scheduled:?}", + "MetricsLogs Start should schedule FlushMetricAggr; scheduled={scheduled:?}", ); assert!( scheduled.contains(&LifecycleAction::FlushData), - "Start should schedule FlushData; scheduled={scheduled:?}", + "MetricsLogs Start should schedule FlushData; scheduled={scheduled:?}", ); assert!( - scheduled.contains(&LifecycleAction::ExtendedHeartbeat), - "Start should schedule ExtendedHeartbeat; scheduled={scheduled:?}", + !scheduled.contains(&LifecycleAction::ExtendedHeartbeat), + "MetricsLogs flavor intentionally excludes ExtendedHeartbeat; scheduled={scheduled:?}", ); } From bfa714bfe617f12ffb074c748cce2ee09c82b678 Mon Sep 17 00:00:00 2001 From: Ayan Khan Date: Mon, 27 Apr 2026 11:20:13 -0400 Subject: [PATCH 3/5] test(telemetry): skip new lifecycle scheduling tests under miri MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both tests build a real `TelemetryWorker`. `dispatch_action(Start)` issues an HTTP `app-started` request via reqwest, and the worker's http client itself is constructed via reqwest — neither of which miri supports. The full-flavor invariant test was hanging miri >540s before timing out. Add `#[cfg_attr(miri, ignore)]` matching the pattern used by 360+ other reqwest-touching tests across the repo. Tests still run on regular `cargo test`. Co-Authored-By: Claude Opus 4.7 (1M context) --- libdd-telemetry/src/worker/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 598f962f4a..635e1faa91 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1469,6 +1469,9 @@ mod tests { /// Keep this as an invariant (walk `delays`) rather than an enumeration of specific /// variants so that adding a new periodic `LifecycleAction` can't silently regress. #[tokio::test] + // dispatch_action(Start) issues an `app-started` HTTP request via the reqwest + // client, which miri cannot run. + #[cfg_attr(miri, ignore)] async fn full_flavor_start_schedules_every_periodic_action() { let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); @@ -1499,6 +1502,8 @@ mod tests { /// future change starts emitting lifecycle telemetry from the metrics-only flavor, /// this test will flag it so the decision is explicit. #[tokio::test] + // build_worker constructs an http client via reqwest, which miri cannot run. + #[cfg_attr(miri, ignore)] async fn metrics_logs_flavor_start_does_not_schedule_extended_heartbeat() { let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::MetricsLogs); From cbe0951ba9356088011986541e8454d399d0d112 Mon Sep 17 00:00:00 2001 From: Ayan Khan Date: Tue, 28 Apr 2026 12:42:14 -0400 Subject: [PATCH 4/5] test(telemetry): trim verbose comments on lifecycle scheduling tests Co-Authored-By: Claude Opus 4.7 (1M context) --- libdd-telemetry/src/worker/mod.rs | 44 +++++++++---------------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 635e1faa91..1916a9846d 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -1460,18 +1460,11 @@ mod tests { b.build_worker(Some(tokio::runtime::Handle::current())).1 } - /// Invariant for the `Full` flavor: every event that has a delay entry in the - /// scheduler's `delays` catalog must be scheduled by `Lifecycle(Start)`. An event - /// with a delay but no initial schedule will sit in `delays` forever without ever - /// entering the `deadlines` queue, so its handler never fires — the original - /// `app-extended-heartbeat` bug. - /// - /// Keep this as an invariant (walk `delays`) rather than an enumeration of specific - /// variants so that adding a new periodic `LifecycleAction` can't silently regress. + /// Every event with a delay must be scheduled on Start; otherwise it sits in + /// `delays` forever and its handler never fires. Walking `delays` (rather than + /// enumerating variants) guards against future periodic actions regressing. #[tokio::test] - // dispatch_action(Start) issues an `app-started` HTTP request via the reqwest - // client, which miri cannot run. - #[cfg_attr(miri, ignore)] + #[cfg_attr(miri, ignore)] // reqwest in dispatch_action async fn full_flavor_start_schedules_every_periodic_action() { let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); @@ -1484,26 +1477,19 @@ mod tests { let scheduled: Vec = worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); - assert!( - !delays.is_empty(), - "test precondition: scheduler should register at least one periodic action", - ); + assert!(!delays.is_empty(), "scheduler should have periodic actions"); for ev in &delays { assert!( scheduled.contains(ev), - "event {ev:?} has a delay catalog entry but was not scheduled on Start; \ - scheduled={scheduled:?}", + "{ev:?} has a delay but was not scheduled on Start; scheduled={scheduled:?}", ); } } - /// The `MetricsLogs` flavor intentionally excludes lifecycle events (app-started, - /// heartbeats, extended heartbeats). This is a negative regression guard: if a - /// future change starts emitting lifecycle telemetry from the metrics-only flavor, - /// this test will flag it so the decision is explicit. + /// `MetricsLogs` flavor intentionally excludes lifecycle events. Negative guard + /// so any future change emitting them from this flavor has to update the test. #[tokio::test] - // build_worker constructs an http client via reqwest, which miri cannot run. - #[cfg_attr(miri, ignore)] + #[cfg_attr(miri, ignore)] // reqwest in build_worker async fn metrics_logs_flavor_start_does_not_schedule_extended_heartbeat() { let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::MetricsLogs); @@ -1514,17 +1500,11 @@ mod tests { let scheduled: Vec = worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); - assert!( - scheduled.contains(&LifecycleAction::FlushMetricAggr), - "MetricsLogs Start should schedule FlushMetricAggr; scheduled={scheduled:?}", - ); - assert!( - scheduled.contains(&LifecycleAction::FlushData), - "MetricsLogs Start should schedule FlushData; scheduled={scheduled:?}", - ); + assert!(scheduled.contains(&LifecycleAction::FlushMetricAggr)); + assert!(scheduled.contains(&LifecycleAction::FlushData)); assert!( !scheduled.contains(&LifecycleAction::ExtendedHeartbeat), - "MetricsLogs flavor intentionally excludes ExtendedHeartbeat; scheduled={scheduled:?}", + "MetricsLogs should not schedule ExtendedHeartbeat; scheduled={scheduled:?}", ); } From 64ce4b611b317c5231921f1d446498250ef45aeb Mon Sep 17 00:00:00 2001 From: Ayan Khan Date: Tue, 28 Apr 2026 13:05:58 -0400 Subject: [PATCH 5/5] fix(telemetry): don't reset FlushData from ExtendedHeartbeat handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the ExtendedHeartbeat handler called schedule_events([FlushData, ExtendedHeartbeat]). Since schedule_event_with_from removes-and-reinserts the deadline, this replaced FlushData's existing deadline with `now + heartbeat_interval`. When `extended_heartbeat_interval < heartbeat_interval`, each ExtendedHeartbeat firing pushes FlushData out further than the next ExtendedHeartbeat deadline, so FlushData never fires — starving app-heartbeat and observability payload delivery. Fix: only re-schedule self in the ExtendedHeartbeat handler. FlushData already self-reschedules in its own handler; the two timers operate independently. This was latent before #1910 because ExtendedHeartbeat never fired at all. Caught by codex review on the PR. Added regression test `extended_heartbeat_does_not_reset_flush_data` that captures FlushData's deadline before and after dispatching ExtendedHeartbeat and asserts it is unchanged. Verified the test fails without the fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- libdd-telemetry/src/worker/mod.rs | 53 +++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 1916a9846d..dc696ced0f 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -499,15 +499,13 @@ impl TelemetryWorker { Ok(()) => self.payload_sent_success(&extended_hb), Err(err) => self.log_err(&err), } + // Only re-schedule self. Resetting `FlushData` here would replace its + // existing deadline with `now + heartbeat_interval`, starving FlushData + // when `extended_heartbeat_interval < heartbeat_interval` because each + // ExtendedHeartbeat firing pushes FlushData out before it can fire. #[allow(clippy::unwrap_used)] self.deadlines - .schedule_events( - &mut [ - LifecycleAction::FlushData, - LifecycleAction::ExtendedHeartbeat, - ] - .into_iter(), - ) + .schedule_event(LifecycleAction::ExtendedHeartbeat) .unwrap(); } Lifecycle(Stop) => { @@ -1508,6 +1506,47 @@ mod tests { ); } + /// Regression: when `extended_heartbeat_interval < heartbeat_interval`, the + /// ExtendedHeartbeat handler must not reset FlushData's deadline. If it did, each + /// firing would push FlushData to `now + heartbeat_interval` and the next + /// (sooner) ExtendedHeartbeat would push it again — starving FlushData forever. + #[tokio::test] + #[cfg_attr(miri, ignore)] // reqwest in dispatch_action + async fn extended_heartbeat_does_not_reset_flush_data() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + + let flush_data_before = worker + .deadlines + .deadlines + .iter() + .find(|(_, k)| *k == LifecycleAction::FlushData) + .map(|(d, _)| *d) + .expect("FlushData scheduled on Start"); + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle( + LifecycleAction::ExtendedHeartbeat, + )) + .await; + + let flush_data_after = worker + .deadlines + .deadlines + .iter() + .find(|(_, k)| *k == LifecycleAction::FlushData) + .map(|(d, _)| *d) + .expect("FlushData should still be scheduled after ExtendedHeartbeat fires"); + + assert_eq!( + flush_data_before, flush_data_after, + "ExtendedHeartbeat must not reset FlushData's deadline", + ); + } + mod reset { use super::super::*; use crate::data::{