Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/internal_logs_rate_limiting.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The `internal_logs` source now captures all internal Vector logs without rate limiting. Previously, repeated log messages were silently
dropped.

authors: pront
40 changes: 40 additions & 0 deletions src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ mod tests {
use vector_lib::{event::Value, lookup::OwnedTargetPath};
use vrl::value::kind::Collection;

use serial_test::serial;

use super::*;
use crate::{
event::Event,
Expand All @@ -234,6 +236,7 @@ mod tests {
// cases because `consume_early_buffer` (called within the
// `start_source` helper) panics when called more than once.
#[tokio::test]
#[serial]
async fn receives_logs() {
trace::init(false, false, "debug", 10);
trace::reset_early_buffer();
Expand Down Expand Up @@ -341,6 +344,43 @@ mod tests {
rx
}

// NOTE: This test requires #[serial] because it directly interacts with global tracing state.
// This is a pre-existing limitation around tracing initialization in tests.
#[tokio::test]
#[serial]
async fn repeated_logs_are_not_rate_limited() {
trace::init(false, false, "info", 10);
trace::reset_early_buffer();

let rx = start_source().await;

// Generate 20 identical log messages with the same component_id
for _ in 0..20 {
info!(component_id = "test", "Repeated test message.");
}

sleep(Duration::from_millis(50)).await;
let events = collect_ready(rx).await;

// Filter to only our test messages
let test_events: Vec<_> = events
.iter()
.filter(|e| {
e.as_log()
.get("message")
.map(|m| m.to_string_lossy() == "Repeated test message.")
.unwrap_or(false)
})
.collect();

// We should receive all 20 messages, no rate limiting.
assert_eq!(
test_events.len(),
20,
"internal_logs source should capture all repeated messages without rate limiting"
);
}

#[test]
fn output_schema_definition_vector_namespace() {
let config = InternalLogsConfig::default();
Expand Down
7 changes: 4 additions & 3 deletions src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64)
let metrics_layer =
metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));

let broadcast_layer = RateLimitedLayer::new(BroadcastLayer::new())
.with_default_limit(internal_log_rate_limit)
.with_filter(fmt_filter.clone());
// BroadcastLayer should NOT be rate limited because it feeds the internal_logs source,
// which users rely on to capture ALL internal Vector logs for debugging/monitoring.
// Console output (stdout/stderr) has its own separate rate limiting below.
let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());

let subscriber = tracing_subscriber::registry()
.with(metrics_layer)
Expand Down
Loading