diff --git a/Cargo.lock b/Cargo.lock index faeb44eef1e6d..82386342d1618 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4875,7 +4875,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.10", "tokio", "tower-service", "tracing 0.1.41", @@ -8061,7 +8061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck 0.5.0", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -8107,7 +8107,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2 1.0.101", "quote 1.0.40", "syn 2.0.106", @@ -9439,6 +9439,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "scc" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.27" @@ -9494,6 +9503,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "3.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" + [[package]] name = "seahash" version = "4.1.0" @@ -9803,6 +9818,31 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serial_test" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" +dependencies = [ + "futures 0.3.31", + "log", + "once_cell", + "parking_lot 0.12.4", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -11445,6 +11485,7 @@ dependencies = [ "criterion", "dashmap", "mock_instant", + "serial_test", "tracing 0.1.41", "tracing-core 0.1.33", "tracing-subscriber", diff --git a/changelog.d/internal_log_component_id_field.breaking.md b/changelog.d/internal_log_component_id_field.breaking.md new file mode 100644 index 0000000000000..5237aa144c86c --- /dev/null +++ b/changelog.d/internal_log_component_id_field.breaking.md @@ -0,0 +1,4 @@ +Vector's internal topology logs now use the `component_id` field name instead of `component` or `key`. +If you are monitoring or filtering Vector's internal logs based on these field names, update your queries to use `component_id`. + +authors: pront diff --git a/docs/DEVELOPING.md b/docs/DEVELOPING.md index ed459429ef6e6..f32965244c28f 100644 --- a/docs/DEVELOPING.md +++ b/docs/DEVELOPING.md @@ -14,6 +14,7 @@ - [Minimum Supported Rust Version](#minimum-supported-rust-version) - [Guidelines](#guidelines) - [Sink healthchecks](#sink-healthchecks) + - [Disabling internal log rate limiting](#disabling-internal-log-rate-limiting) - [Testing](#testing) - [Unit tests](#unit-tests) - [Integration tests](#integration-tests) @@ -328,6 +329,28 @@ that fall into a false negative circumstance. Our goal should be to minimize the likelihood of users needing to pull that lever while still making a good effort to detect common problems. +### Disabling internal log rate limiting + +Vector rate limits its own internal logs by default (10-second windows). During development, you may want to see all log occurrences. + +**Globally** (CLI flag or environment variable): + +```bash +vector --config vector.yaml -r 1 +# or +VECTOR_INTERNAL_LOG_RATE_LIMIT=1 vector --config vector.yaml +``` + +**Per log statement**: + +```rust +// Disable rate limiting for this log +warn!(message = "Error occurred.", %error, internal_log_rate_limit = false); + +// Override rate limit window to 1 second +info!(message = "Processing batch.", batch_size, internal_log_rate_secs = 1); +``` + ## Testing Testing is very important since Vector's primary design principle is reliability. diff --git a/lib/tracing-limit/Cargo.toml b/lib/tracing-limit/Cargo.toml index e6252ea3e6b45..5baef4a14f6f1 100644 --- a/lib/tracing-limit/Cargo.toml +++ b/lib/tracing-limit/Cargo.toml @@ -16,6 +16,7 @@ criterion = "0.7" tracing = "0.1.34" mock_instant = { version = "0.6" } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } +serial_test = { version = "3.2" } [[bench]] name = "limit" diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 81bea3dc33485..62113f36ce7cc 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -1,4 +1,99 @@ #![deny(warnings)] +//! Rate limiting for tracing events. +//! +//! This crate provides a tracing-subscriber layer that rate limits log events to prevent +//! log flooding. Events are grouped by their callsite and contextual fields, with each +//! unique combination rate limited independently. +//! +//! # How it works +//! +//! Within each rate limit window (default 10 seconds): +//! - **1st occurrence**: Event is emitted normally +//! - **2nd occurrence**: Emits a "suppressing" warning +//! - **3rd+ occurrences**: Silent until window expires +//! - **After window**: Emits a summary of suppressed count, then next event normally +//! +//! # Rate limit grouping +//! +//! Events are rate limited independently based on a combination of: +//! - **Callsite**: The code location where the log statement appears +//! - **Contextual fields**: Any fields attached to the event or its parent spans +//! +//! ## How fields contribute to grouping +//! +//! **Only these fields create distinct rate limit groups:** +//! - `component_id` - Different components are rate limited independently +//! +//! **All other fields are ignored for grouping**, including: +//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid resource/cost implications from high-cardinality tags +//! - `message` - The log message itself doesn't differentiate groups +//! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting +//! - `internal_log_rate_secs` - Control field for customizing the rate limit window +//! - Any custom fields you add +//! +//! ## Examples +//! +//! ```rust,ignore +//! // Example 1: Different component_id values create separate rate limit groups +//! info!(component_id = "transform_1", "Processing event"); // Group A +//! info!(component_id = "transform_2", "Processing event"); // Group B +//! // Even though the message is identical, these are rate limited independently +//! +//! // Example 2: Only component_id matters for grouping +//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C +//! info!(component_id = "router", fanout_id = "output_2", "Routing event"); // Group C (same group!) +//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C (same group!) +//! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event"); // Group C (same!) +//! // All of these share the same group because they have the same component_id +//! // The fanout_id and input_id fields are ignored to avoid resource/cost implications +//! +//! // Example 3: Span fields contribute to grouping +//! let span = info_span!("process", component_id = "transform_1"); +//! let _enter = span.enter(); +//! info!("Processing event"); // Group E: callsite + component_id from span +//! drop(_enter); +//! +//! let span = info_span!("process", component_id = "transform_2"); +//! let _enter = span.enter(); +//! info!("Processing event"); // Group F: same callsite but different component_id +//! +//! // Example 4: Nested spans - child span fields take precedence +//! let outer = info_span!("outer", component_id = "parent"); +//! let _outer_guard = outer.enter(); +//! let inner = info_span!("inner", component_id = "child"); +//! let _inner_guard = inner.enter(); +//! info!("Nested event"); // Grouped by component_id = "child" +//! +//! // Example 5: Same callsite with no fields = single rate limit group +//! info!("Simple message"); // Group G +//! info!("Simple message"); // Group G +//! info!("Simple message"); // Group G +//! +//! // Example 6: Custom fields are ignored for grouping +//! info!(component_id = "source", input_id = "in_1", "Received data"); // Group H +//! info!(component_id = "source", input_id = "in_2", "Received data"); // Group H (same group!) +//! // The input_id field is ignored - only component_id matters +//! +//! // Example 7: Disabling rate limiting for specific logs +//! // Rate limiting is ON by default - explicitly disable for important logs +//! warn!( +//! component_id = "critical_component", +//! message = "Fatal error occurred", +//! internal_log_rate_limit = false +//! ); +//! // This event will NEVER be rate limited, regardless of how often it fires +//! +//! // Example 8: Custom rate limit window for specific events +//! info!( +//! component_id = "noisy_component", +//! message = "Frequent status update", +//! internal_log_rate_secs = 60 // Only log once per minute +//! ); +//! // Override the default window for this specific log +//! ``` +//! +//! This ensures logs from different components are rate limited independently, +//! while avoiding resource/cost implications from high-cardinality tags. use std::fmt; @@ -29,7 +124,6 @@ const MESSAGE_FIELD: &str = "message"; // These fields will cause events to be independently rate limited by the values // for these keys const COMPONENT_ID_FIELD: &str = "component_id"; -const VRL_POSITION: &str = "vrl_position"; #[derive(Eq, PartialEq, Hash, Clone)] struct RateKeyIdentifier { @@ -62,6 +156,13 @@ where } } + /// Sets the default rate limit window in seconds. + /// + /// This controls how long logs are suppressed before they can be emitted again. + /// Within each window: + /// - 1st occurrence: Emitted normally + /// - 2nd occurrence: Shows "suppressing" warning + /// - 3rd+ occurrences: Silent until window expires pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self { self.internal_log_rate_limit = internal_log_rate_limit; self @@ -124,12 +225,12 @@ where } fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - // Visit the event, grabbing the limit status if one is defined. If we can't find a rate limit field, or the rate limit - // is set as false, then we let it pass through untouched. + // Visit the event, grabbing the limit status if one is defined. Rate limiting is ON by default + // unless explicitly disabled by setting `internal_log_rate_limit = false`. let mut limit_visitor = LimitVisitor::default(); event.record(&mut limit_visitor); - let limit_exists = limit_visitor.limit.unwrap_or(false); + let limit_exists = limit_visitor.limit.unwrap_or(true); if !limit_exists { return self.inner.on_event(event, ctx); } @@ -139,13 +240,25 @@ where None => self.internal_log_rate_limit, }; - // Visit all of the spans in the scope of this event, looking for specific fields that we use to differentiate - // rate-limited events. This ensures that we don't rate limit an event's _callsite_, but the specific usage of a - // callsite, since multiple copies of the same component could be running, etc. + // Build a composite key from event fields and span context to determine the rate limit group. + // This multi-step process ensures we capture all relevant contextual information: + // + // 1. Start with event-level fields (e.g., fields directly on the log macro call) + // 2. Walk up the span hierarchy from root to current span + // 3. Merge in fields from each span, with child spans taking precedence + // + // This means an event's rate limit group is determined by the combination of: + // - Its callsite (handled separately via RateKeyIdentifier) + // - All contextual fields from both the event and its span ancestry + // + // Example: The same `info!("msg")` callsite in different component contexts becomes + // distinct rate limit groups, allowing fine-grained control over log flooding. let rate_limit_key_values = { let mut keys = RateLimitedSpanKeys::default(); + // Capture fields directly on this event event.record(&mut keys); + // Walk span hierarchy and merge in contextual fields ctx.lookup_current() .into_iter() .flat_map(|span| span.scope().from_root()) @@ -319,6 +432,18 @@ enum TraceValue { Bool(bool), } +#[cfg(test)] +impl fmt::Display for TraceValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TraceValue::String(s) => write!(f, "{}", s), + TraceValue::Int(i) => write!(f, "{}", i), + TraceValue::Uint(u) => write!(f, "{}", u), + TraceValue::Bool(b) => write!(f, "{}", b), + } + } +} + impl From for TraceValue { fn from(b: bool) -> Self { TraceValue::Bool(b) @@ -343,21 +468,28 @@ impl From for TraceValue { } } -/// RateLimitedSpanKeys records span keys that we use to rate limit callsites separately by. For -/// example, if a given trace callsite is called from two different components, then they will be -/// rate limited separately. +/// RateLimitedSpanKeys records span and event fields that differentiate rate limit groups. +/// +/// This struct is used to build a composite key that uniquely identifies a rate limit bucket. +/// Events with different field values will be rate limited independently, even if they come +/// from the same callsite. +/// +/// ## Field categories: +/// +/// **Tracked fields** (only these create distinct rate limit groups): +/// - `component_id` - Different components are rate limited independently +/// +/// **Ignored fields**: All other fields are ignored for grouping purposes. This avoids resource/cost implications from high-cardinality tags. +/// ``` #[derive(Default, Eq, PartialEq, Hash, Clone)] struct RateLimitedSpanKeys { component_id: Option, - vrl_position: Option, } impl RateLimitedSpanKeys { fn record(&mut self, field: &Field, value: TraceValue) { - match field.name() { - COMPONENT_ID_FIELD => self.component_id = Some(value), - VRL_POSITION => self.vrl_position = Some(value), - _ => {} + if field.name() == COMPONENT_ID_FIELD { + self.component_id = Some(value); } } @@ -365,9 +497,6 @@ impl RateLimitedSpanKeys { if let Some(component_id) = &other.component_id { self.component_id = Some(component_id.clone()); } - if let Some(vrl_position) = &other.vrl_position { - self.vrl_position = Some(vrl_position.clone()); - } } } @@ -445,26 +574,115 @@ impl Visit for MessageVisitor { #[cfg(test)] mod test { use std::{ - sync::{Arc, LazyLock, Mutex}, + collections::BTreeMap, + sync::{Arc, Mutex}, time::Duration, }; use mock_instant::global::MockClock; + use serial_test::serial; use tracing_subscriber::layer::SubscriberExt; - static TRACING_DEFAULT_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); - use super::*; + #[derive(Debug, Clone, PartialEq, Eq)] + struct RecordedEvent { + message: String, + fields: BTreeMap, + } + + impl RecordedEvent { + fn new(message: impl Into) -> Self { + Self { + message: message.into(), + fields: BTreeMap::new(), + } + } + + fn with_field(mut self, key: impl Into, value: impl Into) -> Self { + self.fields.insert(key.into(), value.into()); + self + } + } + + /// Macro to create RecordedEvent with optional fields + /// Usage: + /// - `event!("message")` - just message + /// - `event!("message", key1: "value1")` - message with one field + /// - `event!("message", key1: "value1", key2: "value2")` - message with multiple fields + macro_rules! event { + ($msg:expr) => { + RecordedEvent::new($msg) + }; + ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => { + RecordedEvent::new($msg) + $(.with_field(stringify!($key), $value))+ + }; + } + + #[derive(Default)] + struct AllFieldsVisitor { + fields: BTreeMap, + } + + impl Visit for AllFieldsVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + self.fields + .insert(field.name().to_string(), format!("{value:?}")); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.fields + .insert(field.name().to_string(), value.to_string()); + } + } + + impl AllFieldsVisitor { + fn into_event(self) -> RecordedEvent { + let message = self + .fields + .get("message") + .cloned() + .unwrap_or_else(|| String::from("")); + + let mut fields = BTreeMap::new(); + for (key, value) in self.fields { + if key != "message" + && key != "internal_log_rate_limit" + && key != "internal_log_rate_secs" + { + fields.insert(key, value); + } + } + + RecordedEvent { message, fields } + } + } + #[derive(Default)] struct RecordingLayer { - events: Arc>>, + events: Arc>>, _subscriber: std::marker::PhantomData, } impl RecordingLayer { - fn new(events: Arc>>) -> Self { + fn new(events: Arc>>) -> Self { RecordingLayer { events, @@ -485,27 +703,50 @@ mod test { true } - fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { - let mut visitor = MessageVisitor::default(); + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let mut visitor = AllFieldsVisitor::default(); event.record(&mut visitor); + // Also capture fields from span context + if let Some(span) = ctx.lookup_current() { + for span_ref in span.scope().from_root() { + let extensions = span_ref.extensions(); + if let Some(span_keys) = extensions.get::() { + // Add component_id + if let Some(TraceValue::String(ref s)) = span_keys.component_id { + visitor.fields.insert("component_id".to_string(), s.clone()); + } + } + } + } + let mut events = self.events.lock().unwrap(); - events.push(visitor.message.unwrap_or_default()); + events.push(visitor.into_event()); } } - #[test] - fn rate_limits() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); - - let events: Arc>> = Default::default(); - + /// Helper function to set up a test with a rate-limited subscriber. + /// Returns the events Arc for asserting on collected events. + fn setup_test( + default_limit: u64, + ) -> ( + Arc>>, + impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, + ) { + let events: Arc>> = Default::default(); let recorder = RecordingLayer::new(Arc::clone(&events)); let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit)); + (events, sub) + } + + #[test] + #[serial] + fn rate_limits() { + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { - info!(message = "Hello world!", internal_log_rate_limit = true); + info!(message = "Hello world!"); MockClock::advance(Duration::from_millis(100)); } }); @@ -515,80 +756,119 @@ mod test { assert_eq!( *events, vec![ - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 9 times."), + event!("Hello world!"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] + #[serial] fn override_rate_limit_at_callsite() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); + let (events, sub) = setup_test(100); + tracing::subscriber::with_default(sub, || { + for _ in 0..31 { + info!(message = "Hello world!", internal_log_rate_secs = 2); + MockClock::advance(Duration::from_millis(100)); + } + }); - let events: Arc>> = Default::default(); + let events = events.lock().unwrap(); - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(100)); + // With a 2-second window and 100ms advances, we get: + // - Event every 20 iterations (2000ms / 100ms = 20) + // - First window: iteration 0-19 (suppressed 19 times after first 2) + // - Second window: iteration 20-39 (but we only go to 30) + assert_eq!( + *events, + vec![ + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + event!("Internal log [Hello world!] has been suppressed 19 times."), + event!("Hello world!"), + event!("Internal log [Hello world!] is being suppressed to avoid flooding."), + ] + ); + } + + #[test] + #[serial] + fn rate_limit_by_event_key() { + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { for _ in 0..21 { - info!( - message = "Hello world!", - internal_log_rate_limit = true, - internal_log_rate_secs = 1 - ); + for key in &["foo", "bar"] { + info!( + message = format!("Hello {key}!").as_str(), + component_id = &key + ); + } MockClock::advance(Duration::from_millis(100)); } }); let events = events.lock().unwrap(); + // Events with different component_id values create separate rate limit groups assert_eq!( *events, vec![ - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - "Internal log [Hello world!] has been suppressed 9 times.", - "Hello world!", + event!("Hello foo!", component_id: "foo"), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding."), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding."), + event!("Internal log [Hello foo!] has been suppressed 9 times."), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times."), + event!("Hello bar!", component_id: "bar"), + event!("Internal log [Hello foo!] is being suppressed to avoid flooding."), + event!("Internal log [Hello bar!] is being suppressed to avoid flooding."), + event!("Internal log [Hello foo!] has been suppressed 9 times."), + event!("Hello foo!", component_id: "foo"), + event!("Internal log [Hello bar!] has been suppressed 9 times."), + event!("Hello bar!", component_id: "bar"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] - fn rate_limit_by_span_key() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); + #[serial] + fn disabled_rate_limit() { + let (events, sub) = setup_test(1); + tracing::subscriber::with_default(sub, || { + for _ in 0..21 { + info!(message = "Hello world!", internal_log_rate_limit = false); + MockClock::advance(Duration::from_millis(100)); + } + }); - let events: Arc>> = Default::default(); + let events = events.lock().unwrap(); - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + // All 21 events should be emitted since rate limiting is disabled + assert_eq!(events.len(), 21); + assert!(events.iter().all(|e| e == &event!("Hello world!"))); + } + + #[test] + #[serial] + fn rate_limit_ignores_non_special_fields() { + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { - for _ in 0..21 { - for key in &["foo", "bar"] { - for line_number in &[1, 2] { - let span = - info_span!("span", component_id = &key, vrl_position = &line_number); - let _enter = span.enter(); - info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), - internal_log_rate_limit = true - ); - } + for i in 0..21 { + // Call the SAME info! macro multiple times per iteration with varying fanout_id + // to verify that fanout_id doesn't create separate rate limit groups + for _ in 0..3 { + let fanout = if i % 2 == 0 { "output_1" } else { "output_2" }; + info!( + message = "Routing event", + component_id = "router", + fanout_id = fanout + ); } MockClock::advance(Duration::from_millis(100)); } @@ -596,106 +876,140 @@ mod test { let events = events.lock().unwrap(); + // All events share the same rate limit group (same callsite + component_id) + // First event emits normally, second shows suppression, third and beyond are silent + // until the window expires assert_eq!( *events, vec![ - "Hello foo on line_number 1!", - "Hello foo on line_number 2!", - "Hello bar on line_number 1!", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", + // First iteration - first emits, second shows suppression, 3rd+ silent + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), + // After rate limit window (1 sec) - summary shows suppressions + event!("Internal log [Routing event] has been suppressed 29 times."), + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), + event!("Internal log [Routing event] has been suppressed 29 times."), + event!("Routing event", component_id: "router", fanout_id: "output_1"), + event!("Internal log [Routing event] is being suppressed to avoid flooding."), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } #[test] - fn rate_limit_by_event_key() { - let _guard = TRACING_DEFAULT_LOCK.lock().unwrap(); + #[serial] + fn nested_spans_child_takes_precedence() { + let (events, sub) = setup_test(1); + tracing::subscriber::with_default(sub, || { + // Create nested spans where child overrides parent's component_id + let outer = info_span!("outer", component_id = "parent"); + let _outer_guard = outer.enter(); - let events: Arc>> = Default::default(); + for _ in 0..21 { + // Inner span with different component_id should take precedence + let inner = info_span!("inner", component_id = "child"); + let _inner_guard = inner.enter(); + info!(message = "Nested event"); + drop(_inner_guard); - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(1)); + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // All events should be grouped by component_id = "child" (from inner span) + // not "parent" (from outer span), demonstrating child precedence + assert_eq!( + *events, + vec![ + event!("Nested event", component_id: "child"), + event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"), + event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"), + event!("Nested event", component_id: "child"), + event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"), + event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"), + event!("Nested event", component_id: "child"), + ] + ); + } + + #[test] + #[serial] + fn nested_spans_ignores_untracked_fields() { + let (events, sub) = setup_test(1); tracing::subscriber::with_default(sub, || { + // Parent has component_id, child has some_field - only component_id is tracked + let outer = info_span!("outer", component_id = "transform"); + let _outer_guard = outer.enter(); + for _ in 0..21 { - for key in &["foo", "bar"] { - for line_number in &[1, 2] { - info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), - internal_log_rate_limit = true, - component_id = &key, - vrl_position = &line_number - ); - } - } + let inner = info_span!("inner", some_field = "value"); + let _inner_guard = inner.enter(); + info!(message = "Event message"); + drop(_inner_guard); + + MockClock::advance(Duration::from_millis(100)); + } + }); + + let events = events.lock().unwrap(); + + // Events should have component_id from parent, some_field from child is ignored for grouping + // All events are in the same rate limit group + assert_eq!( + *events, + vec![ + event!("Event message", component_id: "transform"), + event!( + "Internal log [Event message] is being suppressed to avoid flooding.", + component_id: "transform" + ), + event!( + "Internal log [Event message] has been suppressed 9 times.", + component_id: "transform" + ), + event!("Event message", component_id: "transform"), + event!( + "Internal log [Event message] is being suppressed to avoid flooding.", + component_id: "transform" + ), + event!( + "Internal log [Event message] has been suppressed 9 times.", + component_id: "transform" + ), + event!("Event message", component_id: "transform"), + ] + ); + } + + #[test] + #[serial] + fn rate_limit_same_message_different_component() { + let (events, sub) = setup_test(1); + tracing::subscriber::with_default(sub, || { + // Use a loop with the SAME callsite to demonstrate that identical messages + // with different component_ids create separate rate limit groups + for component in &["foo", "foo", "bar"] { + info!(message = "Hello!", component_id = component); MockClock::advance(Duration::from_millis(100)); } }); let events = events.lock().unwrap(); + // The first "foo" event is emitted normally (count=0) + // The second "foo" event triggers suppression warning (count=1) + // The "bar" event is emitted normally (count=0 for its group) + // This proves that even with identical message text, different component_ids + // create separate rate limit groups assert_eq!( *events, vec![ - "Hello foo on line_number 1!", - "Hello foo on line_number 2!", - "Hello bar on line_number 1!", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", - "Internal log [Hello foo on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 1!] is being suppressed to avoid flooding.", - "Internal log [Hello bar on line_number 2!] is being suppressed to avoid flooding.", - "Internal log [Hello foo on line_number 1!] has been suppressed 9 times.", - "Hello foo on line_number 1!", - "Internal log [Hello foo on line_number 2!] has been suppressed 9 times.", - "Hello foo on line_number 2!", - "Internal log [Hello bar on line_number 1!] has been suppressed 9 times.", - "Hello bar on line_number 1!", - "Internal log [Hello bar on line_number 2!] has been suppressed 9 times.", - "Hello bar on line_number 2!", + event!("Hello!", component_id: "foo"), + event!("Internal log [Hello!] is being suppressed to avoid flooding."), + event!("Hello!", component_id: "bar"), ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() ); } } diff --git a/src/cli.rs b/src/cli.rs index b4fcbed51ace6..5b9d39fc1b3cd 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -187,8 +187,8 @@ pub struct RootOpts { /// shows a suppression warning, and subsequent occurrences are silent until the /// window expires. /// - /// Logs are grouped by their location in the code and contextual fields so different log instances can be rate - /// limited independently. + /// Logs are grouped by their location in the code and the `component_id` field, so logs + /// from different components are rate limited independently. /// /// Examples: /// - 1: Very verbose, logs can repeat every second diff --git a/src/topology/builder.rs b/src/topology/builder.rs index c38a11c5ceba9..ff3eff5705484 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -229,7 +229,7 @@ impl<'a> Builder<'a> { .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), ) { - debug!(component = %key, "Building new source."); + debug!(component_id = %key, "Building new source."); let typetag = source.inner.get_component_name(); let source_outputs = source.inner.outputs(self.config.schema.log_namespace()); @@ -429,7 +429,7 @@ impl<'a> Builder<'a> { .transforms() .filter(|(key, _)| self.diff.transforms.contains_new(key)) { - debug!(component = %key, "Building new transform."); + debug!(component_id = %key, "Building new transform."); let input_definitions = match schema::input_definitions( &transform.inputs, @@ -541,7 +541,7 @@ impl<'a> Builder<'a> { .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)), ) { - debug!(component = %key, "Building new sink."); + debug!(component_id = %key, "Building new sink."); let sink_inputs = &sink.inputs; let healthcheck = sink.healthcheck(); diff --git a/src/topology/running.rs b/src/topology/running.rs index cfedf92e7a4d9..fba759abc7afc 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -406,7 +406,7 @@ impl RunningTopology { let deadline = Instant::now() + timeout; for key in &diff.sources.to_remove { - debug!(component = %key, "Removing source."); + debug!(component_id = %key, "Removing source."); let previous = self.tasks.remove(key).unwrap(); drop(previous); // detach and forget @@ -417,7 +417,7 @@ impl RunningTopology { } for key in &diff.sources.to_change { - debug!(component = %key, "Changing source."); + debug!(component_id = %key, "Changing source."); self.remove_outputs(key); source_shutdown_handles @@ -446,7 +446,7 @@ impl RunningTopology { // depend on, and thus the closing of their buffer, will naturally cause them to shutdown, // which is why we don't do any manual triggering of shutdown here. for key in &diff.transforms.to_remove { - debug!(component = %key, "Removing transform."); + debug!(component_id = %key, "Removing transform."); let previous = self.tasks.remove(key).unwrap(); drop(previous); // detach and forget @@ -456,7 +456,7 @@ impl RunningTopology { } for key in &diff.transforms.to_change { - debug!(component = %key, "Changing transform."); + debug!(component_id = %key, "Changing transform."); self.remove_inputs(key, diff, new_config).await; self.remove_outputs(key); @@ -577,7 +577,7 @@ impl RunningTopology { })) .collect::>(); for key in &removed_sinks { - debug!(component = %key, "Removing sink."); + debug!(component_id = %key, "Removing sink."); self.remove_inputs(key, diff, new_config).await; } @@ -598,7 +598,7 @@ impl RunningTopology { .collect::>(); for key in &sinks_to_change { - debug!(component = %key, "Changing sink."); + debug!(component_id = %key, "Changing sink."); if reuse_buffers.contains(key) { self.detach_triggers .remove(key) @@ -630,7 +630,7 @@ impl RunningTopology { for key in &removed_sinks { let previous = self.tasks.remove(key).unwrap(); if wait_for_sinks.contains(key) { - debug!(message = "Waiting for sink to shutdown.", %key); + debug!(message = "Waiting for sink to shutdown.", component_id = %key); previous.await.unwrap().unwrap(); } else { drop(previous); // detach and forget @@ -641,7 +641,7 @@ impl RunningTopology { for key in &sinks_to_change { if wait_for_sinks.contains(key) { let previous = self.tasks.remove(key).unwrap(); - debug!(message = "Waiting for sink to shutdown.", %key); + debug!(message = "Waiting for sink to shutdown.", component_id = %key); let buffer = previous.await.unwrap().unwrap(); if reuse_buffers.contains(key) { @@ -751,7 +751,7 @@ impl RunningTopology { // We configure the outputs of any changed/added sources first, so they're available to any // transforms and sinks that come afterwards. for key in diff.sources.changed_and_added() { - debug!(component = %key, "Configuring outputs for source."); + debug!(component_id = %key, "Configuring outputs for source."); self.setup_outputs(key, new_pieces).await; } @@ -761,27 +761,27 @@ impl RunningTopology { .filter(|k| new_pieces.source_tasks.contains_key(k)) .collect(); for key in added_changed_table_sources.iter() { - debug!(component = %key, "Connecting outputs for enrichment table source."); + debug!(component_id = %key, "Connecting outputs for enrichment table source."); self.setup_outputs(key, new_pieces).await; } // We configure the outputs of any changed/added transforms next, for the same reason: we // need them to be available to any transforms and sinks that come afterwards. for key in diff.transforms.changed_and_added() { - debug!(component = %key, "Configuring outputs for transform."); + debug!(component_id = %key, "Configuring outputs for transform."); self.setup_outputs(key, new_pieces).await; } // Now that all possible outputs are configured, we can start wiring up inputs, starting // with transforms. for key in diff.transforms.changed_and_added() { - debug!(component = %key, "Connecting inputs for transform."); + debug!(component_id = %key, "Connecting inputs for transform."); self.setup_inputs(key, diff, new_pieces).await; } // Now that all sources and transforms are fully configured, we can wire up sinks. for key in diff.sinks.changed_and_added() { - debug!(component = %key, "Connecting inputs for sink."); + debug!(component_id = %key, "Connecting inputs for sink."); self.setup_inputs(key, diff, new_pieces).await; } let added_changed_tables: Vec<&ComponentKey> = diff @@ -790,7 +790,7 @@ impl RunningTopology { .filter(|k| new_pieces.inputs.contains_key(k)) .collect(); for key in added_changed_tables.iter() { - debug!(component = %key, "Connecting inputs for enrichment table sink."); + debug!(component_id = %key, "Connecting inputs for enrichment table sink."); self.setup_inputs(key, diff, new_pieces).await; } @@ -866,7 +866,7 @@ impl RunningTopology { ) { let outputs = new_pieces.outputs.remove(key).unwrap(); for (port, output) in outputs { - debug!(component = %key, output_id = ?port, "Configuring output for component."); + debug!(component_id = %key, output_id = ?port, "Configuring output for component."); let id = OutputId { component: key.clone(), @@ -903,7 +903,7 @@ impl RunningTopology { // If the input we're connecting to is changing, that means its outputs will have been // recreated, so instead of replacing a paused sink, we have to add it to this new // output for the first time, since there's nothing to actually replace at this point. - debug!(component = %key, fanout_id = %input, "Adding component input to fanout."); + debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout."); _ = output.send(ControlMessage::Add(key.clone(), tx.clone())); } else { @@ -911,7 +911,7 @@ impl RunningTopology { // components were changed, then the output must still exist, which means we paused // this component's connection to its output, so we have to replace that connection // now: - debug!(component = %key, fanout_id = %input, "Replacing component input in fanout."); + debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout."); _ = output.send(ControlMessage::Replace(key.clone(), tx.clone())); } @@ -955,14 +955,14 @@ impl RunningTopology { // because it isn't coming back. // // Case 3: This component is no longer connected to the input from new config. - debug!(component = %key, fanout_id = %input, "Removing component input from fanout."); + debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout."); _ = output.send(ControlMessage::Remove(key.clone())); } else { // We know that if this component is connected to a given input, and it isn't being // changed, then it will exist when we reconnect inputs, so we should pause it // now to pause further sends through that component until we reconnect: - debug!(component = %key, fanout_id = %input, "Pausing component input in fanout."); + debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout."); _ = output.send(ControlMessage::Pause(key.clone())); } @@ -978,7 +978,7 @@ impl RunningTopology { for (transform_key, transform) in unchanged_transforms { let changed_outputs = get_changed_outputs(diff, transform.inputs.clone()); for output_id in changed_outputs { - debug!(component = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); + debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); let input = self.inputs.get(transform_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); @@ -993,7 +993,7 @@ impl RunningTopology { for (sink_key, sink) in unchanged_sinks { let changed_outputs = get_changed_outputs(diff, sink.inputs.clone()); for output_id in changed_outputs { - debug!(component = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); + debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout."); let input = self.inputs.get(sink_key).cloned().unwrap(); let output = self.outputs.get_mut(&output_id).unwrap(); @@ -1005,12 +1005,12 @@ impl RunningTopology { /// Starts any new or changed components in the given configuration diff. pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) { for key in &diff.sources.to_change { - debug!(message = "Spawning changed source.", key = %key); + debug!(message = "Spawning changed source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in &diff.sources.to_add { - debug!(message = "Spawning new source.", key = %key); + debug!(message = "Spawning new source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } @@ -1029,32 +1029,32 @@ impl RunningTopology { .collect(); for key in changed_table_sources { - debug!(message = "Spawning changed enrichment table source.", key = %key); + debug!(message = "Spawning changed enrichment table source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in added_table_sources { - debug!(message = "Spawning new enrichment table source.", key = %key); + debug!(message = "Spawning new enrichment table source.", component_id = %key); self.spawn_source(key, &mut new_pieces); } for key in &diff.transforms.to_change { - debug!(message = "Spawning changed transform.", key = %key); + debug!(message = "Spawning changed transform.", component_id = %key); self.spawn_transform(key, &mut new_pieces); } for key in &diff.transforms.to_add { - debug!(message = "Spawning new transform.", key = %key); + debug!(message = "Spawning new transform.", component_id = %key); self.spawn_transform(key, &mut new_pieces); } for key in &diff.sinks.to_change { - debug!(message = "Spawning changed sink.", key = %key); + debug!(message = "Spawning changed sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } for key in &diff.sinks.to_add { - trace!(message = "Spawning new sink.", key = %key); + trace!(message = "Spawning new sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } @@ -1077,12 +1077,12 @@ impl RunningTopology { .collect(); for key in changed_tables { - debug!(message = "Spawning changed enrichment table sink.", key = %key); + debug!(message = "Spawning changed enrichment table sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } for key in added_tables { - debug!(message = "Spawning enrichment table new sink.", key = %key); + debug!(message = "Spawning enrichment table new sink.", component_id = %key); self.spawn_sink(key, &mut new_pieces); } }