diff --git a/libdd-telemetry/src/worker/mod.rs b/libdd-telemetry/src/worker/mod.rs index 67b5f5e742..dc696ced0f 100644 --- a/libdd-telemetry/src/worker/mod.rs +++ b/libdd-telemetry/src/worker/mod.rs @@ -427,6 +427,11 @@ impl TelemetryWorker { self.deadlines .schedule_event(LifecycleAction::FlushData) .unwrap(); + + #[allow(clippy::unwrap_used)] + self.deadlines + .schedule_event(LifecycleAction::ExtendedHeartbeat) + .unwrap(); self.data.started = true; } } @@ -494,15 +499,13 @@ impl TelemetryWorker { Ok(()) => self.payload_sent_success(&extended_hb), Err(err) => self.log_err(&err), } + // Only re-schedule self. Resetting `FlushData` here would replace its + // existing deadline with `now + heartbeat_interval`, starving FlushData + // when `extended_heartbeat_interval < heartbeat_interval` because each + // ExtendedHeartbeat firing pushes FlushData out before it can fire. #[allow(clippy::unwrap_used)] self.deadlines - .schedule_events( - &mut [ - LifecycleAction::FlushData, - LifecycleAction::ExtendedHeartbeat, - ] - .into_iter(), - ) + .schedule_event(LifecycleAction::ExtendedHeartbeat) .unwrap(); } Lifecycle(Stop) => { @@ -1283,7 +1286,10 @@ mod tests { use crate::worker::http_client::header::{ DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID, }; - use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerHandle}; + use crate::worker::{ + LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder, + TelemetryWorkerFlavor, TelemetryWorkerHandle, + }; use libdd_common::{http_common, Endpoint}; use tokio::runtime::Runtime; @@ -1436,6 +1442,111 @@ mod tests { ); } + fn build_test_worker_with_flavor(flavor: TelemetryWorkerFlavor) -> TelemetryWorker { + let mut b = TelemetryWorkerBuilder::new( + "h".into(), + "svc".into(), + "lang".into(), + "1".into(), + "tv".into(), + ); + b.config + .set_endpoint(Endpoint::from_slice("http://127.0.0.1:1")) + .unwrap(); + b.runtime_id = Some("rid".into()); + b.flavor = flavor; + b.build_worker(Some(tokio::runtime::Handle::current())).1 + } + + /// Every event with a delay must be scheduled on Start; otherwise it sits in + /// `delays` forever and its handler never fires. Walking `delays` (rather than + /// enumerating variants) guards against future periodic actions regressing. + #[tokio::test] + #[cfg_attr(miri, ignore)] // reqwest in dispatch_action + async fn full_flavor_start_schedules_every_periodic_action() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + + let delays: Vec = + worker.deadlines.delays.iter().map(|(_, k)| *k).collect(); + let scheduled: Vec = + worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); + + assert!(!delays.is_empty(), "scheduler should have periodic actions"); + for ev in &delays { + assert!( + scheduled.contains(ev), + "{ev:?} has a delay but was not scheduled on Start; scheduled={scheduled:?}", + ); + } + } + + /// `MetricsLogs` flavor intentionally excludes lifecycle events. Negative guard + /// so any future change emitting them from this flavor has to update the test. + #[tokio::test] + #[cfg_attr(miri, ignore)] // reqwest in build_worker + async fn metrics_logs_flavor_start_does_not_schedule_extended_heartbeat() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::MetricsLogs); + + let _ = worker + .dispatch_metrics_logs_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + + let scheduled: Vec = + worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect(); + + assert!(scheduled.contains(&LifecycleAction::FlushMetricAggr)); + assert!(scheduled.contains(&LifecycleAction::FlushData)); + assert!( + !scheduled.contains(&LifecycleAction::ExtendedHeartbeat), + "MetricsLogs should not schedule ExtendedHeartbeat; scheduled={scheduled:?}", + ); + } + + /// Regression: when `extended_heartbeat_interval < heartbeat_interval`, the + /// ExtendedHeartbeat handler must not reset FlushData's deadline. If it did, each + /// firing would push FlushData to `now + heartbeat_interval` and the next + /// (sooner) ExtendedHeartbeat would push it again — starving FlushData forever. + #[tokio::test] + #[cfg_attr(miri, ignore)] // reqwest in dispatch_action + async fn extended_heartbeat_does_not_reset_flush_data() { + let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full); + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start)) + .await; + + let flush_data_before = worker + .deadlines + .deadlines + .iter() + .find(|(_, k)| *k == LifecycleAction::FlushData) + .map(|(d, _)| *d) + .expect("FlushData scheduled on Start"); + + let _ = worker + .dispatch_action(TelemetryActions::Lifecycle( + LifecycleAction::ExtendedHeartbeat, + )) + .await; + + let flush_data_after = worker + .deadlines + .deadlines + .iter() + .find(|(_, k)| *k == LifecycleAction::FlushData) + .map(|(d, _)| *d) + .expect("FlushData should still be scheduled after ExtendedHeartbeat fires"); + + assert_eq!( + flush_data_before, flush_data_after, + "ExtendedHeartbeat must not reset FlushData's deadline", + ); + } + mod reset { use super::super::*; use crate::data::{