From 870a06dc5ce855184a965a22926cc4e3c224892b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 08:59:34 -0500 Subject: [PATCH 1/6] fix(internal_logs source): remove rate limit --- src/trace.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/trace.rs b/src/trace.rs index 4cfd445d1fff1..6ecec804569ed 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -64,9 +64,10 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) let metrics_layer = metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO)); - let broadcast_layer = RateLimitedLayer::new(BroadcastLayer::new()) - .with_default_limit(internal_log_rate_limit) - .with_filter(fmt_filter.clone()); + // BroadcastLayer should NOT be rate limited because it feeds the internal_logs source, + // which users rely on to capture ALL internal Vector logs for debugging/monitoring. + // Console output (stdout/stderr) has its own separate rate limiting below. + let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone()); let subscriber = tracing_subscriber::registry() .with(metrics_layer) From ca568b5ff4ada78e7286a5851eaa17f76314717f Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 09:55:33 -0500 Subject: [PATCH 2/6] add changelog --- changelog.d/internal_logs_rate_limiting.fix.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog.d/internal_logs_rate_limiting.fix.md diff --git a/changelog.d/internal_logs_rate_limiting.fix.md b/changelog.d/internal_logs_rate_limiting.fix.md new file mode 100644 index 0000000000000..d6276794bdaf1 --- /dev/null +++ b/changelog.d/internal_logs_rate_limiting.fix.md @@ -0,0 +1,4 @@ +The `internal_logs` source now captures all internal Vector logs without rate limiting. Previously, repeated log messages were silently +dropped. + +authors: pront From c7b9ec9c74ef31d235df08edc4681d7336c9cc1e Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 11:46:10 -0500 Subject: [PATCH 3/6] Debug commit --- src/app.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/app.rs b/src/app.rs index 9e8a415608a8f..d3e7e4ed25a73 100644 --- a/src/app.rs +++ b/src/app.rs @@ -308,6 +308,19 @@ impl StartedApplication { let mut signal_handler = signals.handler; let mut signal_rx = signals.receiver; + // Keep producing burst of logs for different components + std::thread::spawn(|| { + loop { + + for i in 0..10 { + for _ in 0..10 { + info!(message = "Test log message", component_id = i); + } + } + std::thread::sleep(Duration::from_secs(1)); + } + }); + let signal = loop { let has_sources = !topology_controller.lock().await.topology.config.is_empty(); tokio::select! { From 7772026dad95d7e3a9e3811ec461bae234de0c29 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 14:00:26 -0500 Subject: [PATCH 4/6] Fix validated - Revert "Debug commit" This reverts commit c7b9ec9c74ef31d235df08edc4681d7336c9cc1e. --- src/app.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/app.rs b/src/app.rs index d3e7e4ed25a73..9e8a415608a8f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -308,19 +308,6 @@ impl StartedApplication { let mut signal_handler = signals.handler; let mut signal_rx = signals.receiver; - // Keep producing burst of logs for different components - std::thread::spawn(|| { - loop { - - for i in 0..10 { - for _ in 0..10 { - info!(message = "Test log message", component_id = i); - } - } - std::thread::sleep(Duration::from_secs(1)); - } - }); - let signal = loop { let has_sources = !topology_controller.lock().await.topology.config.is_empty(); tokio::select! { From 74f07e26e1c10f3db43f38555ec039d52038dd4f Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 14:20:58 -0500 Subject: [PATCH 5/6] add unit test --- src/sources/internal_logs.rs | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index a5a3d7ff2932b..cd1f29bf82d01 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -213,6 +213,8 @@ mod tests { use vector_lib::{event::Value, lookup::OwnedTargetPath}; use vrl::value::kind::Collection; + use serial_test::serial; + use super::*; use crate::{ event::Event, @@ -234,6 +236,7 @@ mod tests { // cases because `consume_early_buffer` (called within the // `start_source` helper) panics when called more than once. #[tokio::test] + #[serial] async fn receives_logs() { trace::init(false, false, "debug", 10); trace::reset_early_buffer(); @@ -341,6 +344,43 @@ mod tests { rx } + // NOTE: This test requires #[serial] because it directly interacts with global tracing state. + // This is a pre-existing limitation around tracing initialization in tests. + #[tokio::test] + #[serial] + async fn repeated_logs_are_not_rate_limited() { + trace::init(false, false, "info", 10); + trace::reset_early_buffer(); + + let rx = start_source().await; + + // Generate 20 identical log messages with the same component_id + for _ in 0..20 { + info!(component_id = "test", "Repeated test message"); + } + + sleep(Duration::from_millis(50)).await; + let events = collect_ready(rx).await; + + // Filter to only our test messages + let test_events: Vec<_> = events + .iter() + .filter(|e| { + e.as_log() + .get("message") + .map(|m| m.to_string_lossy() == "Repeated test message") + .unwrap_or(false) + }) + .collect(); + + // We should receive all 20 messages, no rate limiting. + assert_eq!( + test_events.len(), + 20, + "internal_logs source should capture all repeated messages without rate limiting" + ); + } + #[test] fn output_schema_definition_vector_namespace() { let config = InternalLogsConfig::default(); From f581ed0dbb7fe6adc08875a0c14156e127d62904 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 12 Nov 2025 15:35:06 -0500 Subject: [PATCH 6/6] fix check-events --- src/sources/internal_logs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index cd1f29bf82d01..e7873a7e8a560 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -356,7 +356,7 @@ mod tests { // Generate 20 identical log messages with the same component_id for _ in 0..20 { - info!(component_id = "test", "Repeated test message"); + info!(component_id = "test", "Repeated test message."); } sleep(Duration::from_millis(50)).await; @@ -368,7 +368,7 @@ mod tests { .filter(|e| { e.as_log() .get("message") - .map(|m| m.to_string_lossy() == "Repeated test message") + .map(|m| m.to_string_lossy() == "Repeated test message.") .unwrap_or(false) }) .collect();