From 1013e9fd3806b29492bbdf00ddc3387b43357572 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 27 Feb 2025 15:26:48 -0500 Subject: [PATCH 01/25] Add config for maximum allowed line size after merging --- src/sources/kubernetes_logs/mod.rs | 31 ++++++- .../kubernetes_logs/partial_events_merger.rs | 88 +++++++++++++++++-- 2 files changed, 106 insertions(+), 13 deletions(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index ef0f7bd947023..c7f66205f94b8 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -4,7 +4,7 @@ //! running inside the cluster as a DaemonSet. #![deny(missing_docs)] -use std::{path::PathBuf, time::Duration}; +use std::{cmp::min, path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; @@ -193,6 +193,16 @@ pub struct Config { #[configurable(metadata(docs::type_unit = "bytes"))] max_line_bytes: usize, + /// The maximum number of bytes a line can contain - after merging - before being discarded. + /// + /// This protects against malformed lines or tailing incorrect files. + /// + /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this + /// config will have no practical impact (the same is true of auto_partial_merge). Finally, the smaller of max_merged_line_bytes and max_line_bytes will apply + /// if auto_partial_merge is true, so if this is set to eg 1 MiB but max_line_bytes is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + #[configurable(metadata(docs::type_unit = "bytes"))] + max_merged_line_bytes: usize, + /// The number of lines to read for generating the checksum. /// /// If your files share a common header that is not always a fixed size, @@ -294,6 +304,7 @@ impl Default for Config { max_read_bytes: default_max_read_bytes(), oldest_first: default_oldest_first(), max_line_bytes: default_max_line_bytes(), + max_merged_line_bytes: default_max_merged_line_bytes(), fingerprint_lines: default_fingerprint_lines(), glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(), ingestion_timestamp_field: None, @@ -553,6 +564,7 @@ struct Source { max_read_bytes: usize, oldest_first: bool, max_line_bytes: usize, + max_merged_line_bytes: usize, fingerprint_lines: usize, glob_minimum_cooldown: Duration, use_apiserver_cache: bool, @@ -641,6 +653,7 @@ impl Source { max_read_bytes: config.max_read_bytes, oldest_first: config.oldest_first, max_line_bytes: config.max_line_bytes, + max_merged_line_bytes: config.max_merged_line_bytes, fingerprint_lines: config.fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache: config.use_apiserver_cache, @@ -676,6 +689,7 @@ impl Source { max_read_bytes, oldest_first, max_line_bytes, + max_merged_line_bytes, fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache, @@ -775,6 +789,11 @@ impl Source { let ignore_before = calculate_ignore_before(ignore_older_secs); + let mut resolved_max_line_bytes = max_line_bytes; + if auto_partial_merge { + resolved_max_line_bytes = min(max_line_bytes, max_merged_line_bytes); + } + // TODO: maybe more of the parameters have to be configurable. let checkpointer = Checkpointer::new(&data_dir); @@ -799,7 +818,7 @@ impl Source { ignore_before, // The maximum number of bytes a line can contain before being discarded. This // protects against malformed lines or tailing incorrect files. - max_line_bytes, + max_line_bytes: resolved_max_line_bytes, // Delimiter bytes that is used to read the file line-by-line line_delimiter: Bytes::from("\n"), // The directory where to keep the checkpoints. @@ -817,7 +836,7 @@ impl Source { ignored_header_bytes: 0, lines: fingerprint_lines, }, - max_line_length: max_line_bytes, + max_line_length: resolved_max_line_bytes, ignore_not_found: true, }, oldest_first, @@ -893,7 +912,7 @@ impl Source { let (events_count, _) = events.size_hint(); let mut stream = if auto_partial_merge { - merge_partial_events(events, log_namespace).left_stream() + merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream() } else { events.right_stream() }; @@ -1025,6 +1044,10 @@ const fn default_max_line_bytes() -> usize { 32 * 1024 // 32 KiB } +const fn default_max_merged_line_bytes() -> usize { + 50 * 1024 * 1024 // 50 MiB +} + const fn default_glob_minimum_cooldown_ms() -> Duration { Duration::from_millis(60_000) } diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 14628df46ad6e..a377c129f040a 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -29,25 +29,42 @@ impl PartialEventMergeState { file: &str, message_path: &OwnedTargetPath, expiration_time: Duration, + max_merged_line_bytes: usize, ) { + let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { + // don't bother continuing to process events that are already too big + if bucket.too_big { + return; + } + // merging with existing event if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) = (bucket.event.get_mut(message_path), event.get(message_path)) { - let mut bytes_mut = BytesMut::new(); bytes_mut.extend_from_slice(prev_value); bytes_mut.extend_from_slice(new_value); + + // drop event if it's bigger than max allowed + if bytes_mut.len() > max_merged_line_bytes { + bucket.too_big = true; + } + *prev_value = bytes_mut.freeze(); } } else { + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { + bytes_mut.extend_from_slice(event_bytes); + } + // new event self.buckets.insert( file.to_owned(), Bucket { event, expiration: Instant::now() + expiration_time, + too_big: bytes_mut.len() > max_merged_line_bytes, }, ); } @@ -70,7 +87,9 @@ impl PartialEventMergeState { fn flush_events(&mut self, emitter: &mut Emitter) { for (_, bucket) in self.buckets.drain() { - emitter.emit(bucket.event); + if !bucket.too_big { + emitter.emit(bucket.event); + } } } } @@ -78,13 +97,15 @@ impl PartialEventMergeState { struct Bucket { event: LogEvent, expiration: Instant, + too_big: bool, } pub fn merge_partial_events( stream: impl Stream + 'static, log_namespace: LogNamespace, + max_merged_line_bytes: usize, ) -> impl Stream { - merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME) + merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME, max_merged_line_bytes) } // internal function that allows customizing the expiration time (for testing) @@ -92,6 +113,7 @@ fn merge_partial_events_with_custom_expiration( stream: impl Stream + 'static, log_namespace: LogNamespace, expiration_time: Duration, + max_merged_line_bytes: usize, ) -> impl Stream { let partial_flag_path = match log_namespace { LogNamespace::Vector => { @@ -132,7 +154,7 @@ fn merge_partial_events_with_custom_expiration( .map(|x| x.to_string()) .unwrap_or_default(); - state.add_event(event, &file, &message_path, expiration_time); + state.add_event(event, &file, &message_path, expiration_time, max_merged_line_bytes); if !is_partial { if let Some(log_event) = state.remove_event(&file) { emitter.emit(log_event); @@ -164,7 +186,7 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -174,6 +196,18 @@ mod test { ); } + #[tokio::test] + async fn merge_single_event_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + + let input_stream = futures::stream::iter([e_1.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 1); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn merge_multiple_events_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -184,7 +218,7 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -194,6 +228,23 @@ mod test { ); } + #[tokio::test] + async fn merge_multiple_events_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 32 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_flush_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -205,7 +256,7 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -215,6 +266,24 @@ mod test { ); } + #[tokio::test] + async fn multiple_events_flush_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + e_1.insert("_partial", true); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 32 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_expire_legacy() { let mut e_1 = LogEvent::from("test message"); @@ -233,6 +302,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), + 50*1024*1024, ); let output: Vec = output_stream.take(2).collect().await; @@ -256,7 +326,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -286,7 +356,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 2b3ea0a37de9f8363f0d981f0317edae5a750e72 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 27 Feb 2025 15:58:46 -0500 Subject: [PATCH 02/25] Add warns when we drop partial logs for being too big; shift some comments around --- .../kubernetes_logs/partial_events_merger.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index a377c129f040a..e7270872d5976 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -33,7 +33,7 @@ impl PartialEventMergeState { ) { let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { - // don't bother continuing to process events that are already too big + // don't bother continuing to process new partial events that match existing ones that are already too big if bucket.too_big { return; } @@ -49,22 +49,36 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { bucket.too_big = true; + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + internal_log_rate_limit = true + ); } *prev_value = bytes_mut.freeze(); } } else { + // new event + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); } - // new event + let too_big = bytes_mut.len() > max_merged_line_bytes; + + if too_big { + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + internal_log_rate_limit = true + ); + } + self.buckets.insert( file.to_owned(), Bucket { event, expiration: Instant::now() + expiration_time, - too_big: bytes_mut.len() > max_merged_line_bytes, + too_big, }, ); } From cd9d607f46bd7a94d3a22c445b7ad95c83b81c3c Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 3 Mar 2025 16:26:02 -0500 Subject: [PATCH 03/25] Add changelog --- changelog.d/22581_max_merged_line_bytes.feature.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog.d/22581_max_merged_line_bytes.feature.md diff --git a/changelog.d/22581_max_merged_line_bytes.feature.md b/changelog.d/22581_max_merged_line_bytes.feature.md new file mode 100644 index 0000000000000..b7ce5502f09b2 --- /dev/null +++ b/changelog.d/22581_max_merged_line_bytes.feature.md @@ -0,0 +1,6 @@ +The `kubernetes_logs` source now supports a new configuration called `max_merged_line_bytes` which allows limiting the size +of lines even when they have been assembled via `auto_partial_merge` (the existing `max_line_bytes` field only applies +before merging, and as such makes it impossible to limit lines assembled via merging, short of specifying a size so small +that the continuation character isn't reached, and merging doesn't happen at all). + +authors: ganelo From dd5fad04abf7c50cd6c937e5da344da8f36f0204 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 3 Mar 2025 16:28:04 -0500 Subject: [PATCH 04/25] Format --- .../kubernetes_logs/partial_events_merger.rs | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index e7270872d5976..12899e2473a1b 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -119,7 +119,12 @@ pub fn merge_partial_events( log_namespace: LogNamespace, max_merged_line_bytes: usize, ) -> impl Stream { - merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME, max_merged_line_bytes) + merge_partial_events_with_custom_expiration( + stream, + log_namespace, + EXPIRATION_TIME, + max_merged_line_bytes, + ) } // internal function that allows customizing the expiration time (for testing) @@ -168,7 +173,13 @@ fn merge_partial_events_with_custom_expiration( .map(|x| x.to_string()) .unwrap_or_default(); - state.add_event(event, &file, &message_path, expiration_time, max_merged_line_bytes); + state.add_event( + event, + &file, + &message_path, + expiration_time, + max_merged_line_bytes, + ); if !is_partial { if let Some(log_event) = state.remove_event(&file) { emitter.emit(log_event); @@ -200,7 +211,8 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -232,7 +244,8 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -270,7 +283,8 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -316,7 +330,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), - 50*1024*1024, + 50 * 1024 * 1024, ); let output: Vec = output_stream.take(2).collect().await; @@ -340,7 +354,8 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -370,7 +385,8 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 354008cc9896cca81535f3f455bddd528c231f86 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Fri, 7 Mar 2025 14:29:43 -0500 Subject: [PATCH 05/25] Increment component_discarded_events_total on violation of max_line_size and max_merged_line_size --- lib/file-source/src/buffer.rs | 46 ++++++++++------ lib/file-source/src/file_server.rs | 16 +++++- lib/file-source/src/file_watcher/mod.rs | 54 ++++++++++++++----- .../src/file_watcher/tests/experiment.rs | 9 ++-- .../tests/experiment_no_truncations.rs | 14 +++-- lib/file-source/src/fingerprinter.rs | 3 ++ lib/file-source/src/internal_events.rs | 9 ++++ src/internal_events/file.rs | 38 +++++++++++++ src/internal_events/kubernetes_logs.rs | 24 +++++++++ .../kubernetes_logs/partial_events_merger.rs | 20 ++++--- 10 files changed, 187 insertions(+), 46 deletions(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index c8dbe1f400905..bfdb860b0ddbf 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -2,10 +2,14 @@ use std::io::{self, BufRead}; use bstr::Finder; use bytes::BytesMut; -use tracing::warn; use crate::FilePosition; +pub struct ReadResult { + pub successfully_read: Option, + pub discarded_for_size: Vec, +} + /// Read up to `max_size` bytes from `reader`, splitting by `delim` /// /// The function reads up to `max_size` bytes from `reader`, splitting the input @@ -29,17 +33,18 @@ use crate::FilePosition; /// Benchmarks indicate that this function processes in the high single-digit /// GiB/s range for buffers of length 1KiB. For buffers any smaller than this /// the overhead of setup dominates our benchmarks. -pub fn read_until_with_max_size( - reader: &mut R, - position: &mut FilePosition, - delim: &[u8], - buf: &mut BytesMut, +pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( + reader: &'a mut R, + position: &'a mut FilePosition, + delim: &'a [u8], + buf: &'a mut BytesMut, max_size: usize, -) -> io::Result> { +) -> io::Result { let mut total_read = 0; let mut discarding = false; let delim_finder = Finder::new(delim); let delim_len = delim.len(); + let mut discarded_for_size = Vec::new(); loop { let available: &[u8] = match reader.fill_buf() { Ok(n) => n, @@ -68,16 +73,16 @@ pub fn read_until_with_max_size( total_read += used; if !discarding && buf.len() > max_size { - warn!( - message = "Found line that exceeds max_line_bytes; discarding.", - internal_log_rate_limit = true - ); + discarded_for_size.push(buf.clone()); discarding = true; } if done { if !discarding { - return Ok(Some(total_read)); + return Ok(ReadResult { + successfully_read: Some(total_read), + discarded_for_size, + }); } else { discarding = false; buf.clear(); @@ -87,7 +92,10 @@ pub fn read_until_with_max_size( // us to observe an incomplete write. We return None here and let the loop continue // next time the method is called. This is safe because the buffer is specific to this // FileWatcher. - return Ok(None); + return Ok(ReadResult { + successfully_read: None, + discarded_for_size, + }); } } } @@ -99,6 +107,8 @@ mod test { use bytes::{BufMut, BytesMut}; use quickcheck::{QuickCheck, TestResult}; + use crate::buffer::ReadResult; + use super::read_until_with_max_size; fn qc_inner(chunks: Vec>, delim: u8, max_size: NonZeroU8) -> TestResult { @@ -181,7 +191,10 @@ mod test { ) .unwrap() { - None => { + ReadResult { + successfully_read: None, + discarded_for_size: _, + } => { // Subject only returns None if this is the last chunk _and_ // the chunk did not contain a delimiter _or_ the delimiter // was outside the max_size range _or_ the current chunk is empty. @@ -190,7 +203,10 @@ mod test { .any(|details| ((details.chunk_index == idx) && details.within_max_size)); assert!(chunk.is_empty() || !has_valid_delimiter) } - Some(total_read) => { + ReadResult { + successfully_read: Some(total_read), + discarded_for_size: _, + } => { // Now that the function has returned we confirm that the // returned details match our `first_delim` and also that // the `buffer` is populated correctly. diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 510b20694d0eb..d460299976e70 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace}; use crate::{ checkpointer::{Checkpointer, CheckpointsView}, - file_watcher::FileWatcher, + file_watcher::{FileWatcher, RawLineResult}, fingerprinter::{FileFingerprint, Fingerprinter}, paths_provider::PathsProvider, FileSourceInternalEvents, ReadFrom, @@ -240,7 +240,19 @@ where let start = time::Instant::now(); let mut bytes_read: usize = 0; - while let Ok(Some(line)) = watcher.read_line() { + while let Ok(RawLineResult { + raw_line: Some(line), + discarded_for_size, + }) = watcher.read_line() + { + discarded_for_size.iter().for_each(|buf| { + self.emitter.emit_file_line_too_long( + &buf.clone(), + self.max_line_bytes, + buf.len(), + ) + }); + let sz = line.bytes.len(); trace!( message = "Read bytes.", diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index 80db2b9bc876c..662374ad3c8e4 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -12,7 +12,9 @@ use tracing::debug; use vector_common::constants::GZIP_MAGIC; use crate::{ - buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom, + buffer::{read_until_with_max_size, ReadResult}, + metadata_ext::PortableFileExt, + FilePosition, ReadFrom, }; #[cfg(test)] mod tests; @@ -28,6 +30,12 @@ pub(super) struct RawLine { pub bytes: Bytes, } +#[derive(Debug)] +pub struct RawLineResult { + pub raw_line: Option, + pub discarded_for_size: Vec, +} + /// The `FileWatcher` struct defines the polling based state machine which reads /// from a file path, transparently updating the underlying file descriptor when /// the file has been rolled over, as is common for logs. @@ -207,7 +215,7 @@ impl FileWatcher { /// This function will attempt to read a new line from its file, blocking, /// up to some maximum but unspecified amount of time. `read_line` will open /// a new file handler as needed, transparently to the caller. - pub(super) fn read_line(&mut self) -> io::Result> { + pub(super) fn read_line(&mut self) -> io::Result { self.track_read_attempt(); let reader = &mut self.reader; @@ -220,14 +228,23 @@ impl FileWatcher { &mut self.buf, self.max_line_bytes, ) { - Ok(Some(_)) => { + Ok(ReadResult { + successfully_read: Some(_), + discarded_for_size, + }) => { self.track_read_success(); - Ok(Some(RawLine { - offset: initial_position, - bytes: self.buf.split().freeze(), - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: self.buf.split().freeze(), + }), + discarded_for_size, + }) } - Ok(None) => { + Ok(ReadResult { + successfully_read: None, + discarded_for_size, + }) => { if !self.file_findable() { self.set_dead(); // File has been deleted, so return what we have in the buffer, even though it @@ -237,16 +254,25 @@ impl FileWatcher { if buf.is_empty() { // EOF self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size, + }) } else { - Ok(Some(RawLine { - offset: initial_position, - bytes: buf, - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: buf, + }), + discarded_for_size, + }) } } else { self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size, + }) } } Err(e) => { diff --git a/lib/file-source/src/file_watcher/tests/experiment.rs b/lib/file-source/src/file_watcher/tests/experiment.rs index decdbdab98240..cbbfabe0a67b7 100644 --- a/lib/file-source/src/file_watcher/tests/experiment.rs +++ b/lib/file-source/src/file_watcher/tests/experiment.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -96,11 +96,14 @@ fn experiment(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; continue; } diff --git a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs index ee8a24a9f95bf..78aa7536b9a38 100644 --- a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs +++ b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(Some(line)) => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) => { let exp = fwfiles[read_index].read_line().expect("could not readline"); assert_eq!(exp.into_bytes(), line.bytes); // assert_eq!(sz, buf.len() + 1); diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 542e67393da35..0e864c6cfac73 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -382,6 +382,7 @@ mod test { time::Duration, }; + use bytes::BytesMut; use flate2::write::GzEncoder; use tempfile::{tempdir, TempDir}; @@ -803,5 +804,7 @@ mod test { fn emit_files_open(&self, _: usize) {} fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {} + + fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {} } } diff --git a/lib/file-source/src/internal_events.rs b/lib/file-source/src/internal_events.rs index 9eb60e65397a1..726ec09f29999 100644 --- a/lib/file-source/src/internal_events.rs +++ b/lib/file-source/src/internal_events.rs @@ -1,5 +1,7 @@ use std::{io::Error, path::Path, time::Duration}; +use bytes::BytesMut; + /// Every internal event in this crate has a corresponding /// method in this trait which should emit the event. pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { @@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { fn emit_files_open(&self, count: usize); fn emit_path_globbing_failed(&self, path: &Path, error: &Error); + + fn emit_file_line_too_long( + &self, + bytes: &BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ); } diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index fba86d6ad1b72..acc8212940c5f 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -106,6 +106,7 @@ impl InternalEvent for FileIoError<'_, P> { mod source { use std::{io::Error, path::Path, time::Duration}; + use bytes::BytesMut; use metrics::counter; use vector_lib::file_source::FileSourceInternalEvents; @@ -496,6 +497,30 @@ mod source { } } + #[derive(Debug)] + pub struct FileLineTooBig<'a> { + pub bytes: &'a BytesMut, + pub configured_limit: usize, + pub encountered_size_so_far: usize, + } + + impl InternalEvent for FileLineTooBig<'_> { + fn emit(self) { + warn!( + message = "Found line that exceeds max_line_bytes; discarding.", + bytes = ?self.bytes, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + ); + counter!( + "component_discarded_events_total", + "intentional" => "true", + ) + .increment(1); + } + } + #[derive(Clone)] pub struct FileSourceInternalEventsEmitter { pub include_file_metric_tag: bool, @@ -578,5 +603,18 @@ mod source { fn emit_path_globbing_failed(&self, path: &Path, error: &Error) { emit!(PathGlobbingError { path, error }); } + + fn emit_file_line_too_long( + &self, + bytes: &bytes::BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ) { + emit!(FileLineTooBig { + bytes, + configured_limit, + encountered_size_so_far + }); + } } } diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index 77dac30b1945b..c3ea2cec51587 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -205,3 +205,27 @@ impl InternalEvent for KubernetesLifecycleError { }); } } + +#[derive(Debug)] +pub struct KubernetesMergedLineTooBig<'a> { + pub event: &'a Event, + pub configured_limit: usize, + pub encountered_size_so_far: usize, +} + +impl InternalEvent for KubernetesMergedLineTooBig<'_> { + fn emit(self) { + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + event = ?self.event, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + ); + counter!( + "component_discarded_events_total", + "intentional" => "true", + ) + .increment(1); + } +} diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 12899e2473a1b..809172569b2f5 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -11,6 +11,7 @@ use vrl::owned_value_path; use crate::event; use crate::event::{Event, LogEvent, Value}; +use crate::internal_events::KubernetesMergedLineTooBig; use crate::sources::kubernetes_logs::transform_utils::get_message_path; /// The key we use for `file` field. @@ -49,10 +50,11 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { bucket.too_big = true; - warn!( - message = "Found line that exceeds max_merged_line_bytes; discarding.", - internal_log_rate_limit = true - ); + emit!(KubernetesMergedLineTooBig { + event: &Event::Log(event), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); } *prev_value = bytes_mut.freeze(); @@ -67,10 +69,12 @@ impl PartialEventMergeState { let too_big = bytes_mut.len() > max_merged_line_bytes; if too_big { - warn!( - message = "Found line that exceeds max_merged_line_bytes; discarding.", - internal_log_rate_limit = true - ); + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBig { + event: &Event::Log(event.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); } self.buckets.insert( From 0d1b8067b2b133ec491d64d80d6cb751034988f3 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 13 Mar 2025 15:24:50 -0400 Subject: [PATCH 06/25] Update changelog.d/22581_max_merged_line_bytes.feature.md Co-authored-by: Pavlos Rontidis --- changelog.d/22581_max_merged_line_bytes.feature.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/changelog.d/22581_max_merged_line_bytes.feature.md b/changelog.d/22581_max_merged_line_bytes.feature.md index b7ce5502f09b2..b5d36e53bde52 100644 --- a/changelog.d/22581_max_merged_line_bytes.feature.md +++ b/changelog.d/22581_max_merged_line_bytes.feature.md @@ -1,6 +1,3 @@ -The `kubernetes_logs` source now supports a new configuration called `max_merged_line_bytes` which allows limiting the size -of lines even when they have been assembled via `auto_partial_merge` (the existing `max_line_bytes` field only applies -before merging, and as such makes it impossible to limit lines assembled via merging, short of specifying a size so small -that the continuation character isn't reached, and merging doesn't happen at all). +The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes. authors: ganelo From 8e0c9946ba82b865a29caa09112ccc6dfd6d30c9 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 27 Mar 2025 17:24:49 -0400 Subject: [PATCH 07/25] Don't emit expired events that are too big nor ones that don't appear to be partial; fix test --- src/sources/kubernetes_logs/partial_events_merger.rs | 8 ++++---- var/log/pods/default/checkpoints.json | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 var/log/pods/default/checkpoints.json diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 809172569b2f5..4bf4bd19ddb1f 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -89,14 +89,14 @@ impl PartialEventMergeState { } fn remove_event(&mut self, file: &str) -> Option { - self.buckets.remove(file).map(|bucket| bucket.event) + self.buckets.remove(file).filter(|bucket| !bucket.too_big).map(|bucket| bucket.event) } fn emit_expired_events(&mut self, emitter: &mut Emitter) { let now = Instant::now(); self.buckets.retain(|_key, bucket| { let expired = now >= bucket.expiration; - if expired { + if expired && !bucket.too_big { emitter.emit(bucket.event.clone()); } !expired @@ -309,8 +309,8 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - // 32 > length of first message but less than the two combined - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + // 24 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 24); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 0); diff --git a/var/log/pods/default/checkpoints.json b/var/log/pods/default/checkpoints.json new file mode 100644 index 0000000000000..e7e12802f62c5 --- /dev/null +++ b/var/log/pods/default/checkpoints.json @@ -0,0 +1 @@ +{"version":"1","checkpoints":[]} \ No newline at end of file From cae7b01724c3b4f9edbe44590117cebf3f91cc7b Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Fri, 28 Mar 2025 11:55:08 -0400 Subject: [PATCH 08/25] Fix another test --- src/sources/kubernetes_logs/partial_events_merger.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 4bf4bd19ddb1f..4b2a52d549c2d 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -269,8 +269,8 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - // 32 > length of first message but less than the two combined - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + // 24 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 24); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 0); From bc23d35020c2794ca3ee60a95c377c38fecf4a2f Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 15:33:25 -0400 Subject: [PATCH 09/25] Update src/sources/kubernetes_logs/mod.rs Co-authored-by: Pavlos Rontidis --- src/sources/kubernetes_logs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index c7f66205f94b8..67dc997319910 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -198,7 +198,7 @@ pub struct Config { /// This protects against malformed lines or tailing incorrect files. /// /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this - /// config will have no practical impact (the same is true of auto_partial_merge). Finally, the smaller of max_merged_line_bytes and max_line_bytes will apply + /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply /// if auto_partial_merge is true, so if this is set to eg 1 MiB but max_line_bytes is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. #[configurable(metadata(docs::type_unit = "bytes"))] max_merged_line_bytes: usize, From 62bce6cef692f5d0bf07005cdcacb8d4e7643788 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 15:33:35 -0400 Subject: [PATCH 10/25] Update src/sources/kubernetes_logs/mod.rs Co-authored-by: Pavlos Rontidis --- src/sources/kubernetes_logs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 67dc997319910..47945bfe34cc3 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -199,7 +199,7 @@ pub struct Config { /// /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply - /// if auto_partial_merge is true, so if this is set to eg 1 MiB but max_line_bytes is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + /// if auto_partial_merge is true, so if this is set to eg 1 MiB but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. #[configurable(metadata(docs::type_unit = "bytes"))] max_merged_line_bytes: usize, From 4e93bc98dc5998c5049a776b39e1760f1eb40215 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 15:37:03 -0400 Subject: [PATCH 11/25] Remove inadvertently added file --- var/log/pods/default/checkpoints.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 var/log/pods/default/checkpoints.json diff --git a/var/log/pods/default/checkpoints.json b/var/log/pods/default/checkpoints.json deleted file mode 100644 index e7e12802f62c5..0000000000000 --- a/var/log/pods/default/checkpoints.json +++ /dev/null @@ -1 +0,0 @@ -{"version":"1","checkpoints":[]} \ No newline at end of file From f9d7ca8c52ebe0613beab6e12c6ecf565b92c3d0 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 16:04:44 -0400 Subject: [PATCH 12/25] Include Value rather than Event in error struct --- src/internal_events/kubernetes_logs.rs | 3 ++- .../kubernetes_logs/partial_events_merger.rs | 24 ++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index c3ea2cec51587..512ce7e3bcf0c 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -4,6 +4,7 @@ use vector_lib::{ internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}, json_size::JsonSize, }; +use vrl::core::Value; use crate::event::Event; @@ -208,7 +209,7 @@ impl InternalEvent for KubernetesLifecycleError { #[derive(Debug)] pub struct KubernetesMergedLineTooBig<'a> { - pub event: &'a Event, + pub event: &'a Value, pub configured_limit: usize, pub encountered_size_so_far: usize, } diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 4b2a52d549c2d..7a9b90ec344df 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -50,8 +50,9 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { bucket.too_big = true; + // perf impact of clone should be minimal since being here means no further processing of this event will occur emit!(KubernetesMergedLineTooBig { - event: &Event::Log(event), + event: &Value::Bytes(new_value.clone()), configured_limit: max_merged_line_bytes, encountered_size_so_far: bytes_mut.len() }); @@ -62,19 +63,20 @@ impl PartialEventMergeState { } else { // new event + let mut too_big = false; + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); - } + too_big = bytes_mut.len() > max_merged_line_bytes; - let too_big = bytes_mut.len() > max_merged_line_bytes; - - if too_big { - // perf impact of clone should be minimal since being here means no further processing of this event will occur - emit!(KubernetesMergedLineTooBig { - event: &Event::Log(event.clone()), - configured_limit: max_merged_line_bytes, - encountered_size_so_far: bytes_mut.len() - }); + if too_big { + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBig { + event: &Value::Bytes(event_bytes.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); + } } self.buckets.insert( From 6e582581dc760b7017c295689f6634c84b05ed03 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 16:05:54 -0400 Subject: [PATCH 13/25] Rename field in bucket struct --- .../kubernetes_logs/partial_events_merger.rs | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 7a9b90ec344df..b6d6817865d89 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -35,7 +35,7 @@ impl PartialEventMergeState { let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { // don't bother continuing to process new partial events that match existing ones that are already too big - if bucket.too_big { + if bucket.exceeds_max_merged_line_limit { return; } @@ -49,7 +49,7 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { - bucket.too_big = true; + bucket.exceeds_max_merged_line_limit = true; // perf impact of clone should be minimal since being here means no further processing of this event will occur emit!(KubernetesMergedLineTooBig { event: &Value::Bytes(new_value.clone()), @@ -63,13 +63,13 @@ impl PartialEventMergeState { } else { // new event - let mut too_big = false; + let mut exceeds_max_merged_line_limit = false; if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); - too_big = bytes_mut.len() > max_merged_line_bytes; + exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; - if too_big { + if exceeds_max_merged_line_limit { // perf impact of clone should be minimal since being here means no further processing of this event will occur emit!(KubernetesMergedLineTooBig { event: &Value::Bytes(event_bytes.clone()), @@ -84,21 +84,21 @@ impl PartialEventMergeState { Bucket { event, expiration: Instant::now() + expiration_time, - too_big, + exceeds_max_merged_line_limit, }, ); } } fn remove_event(&mut self, file: &str) -> Option { - self.buckets.remove(file).filter(|bucket| !bucket.too_big).map(|bucket| bucket.event) + self.buckets.remove(file).filter(|bucket| !bucket.exceeds_max_merged_line_limit).map(|bucket| bucket.event) } fn emit_expired_events(&mut self, emitter: &mut Emitter) { let now = Instant::now(); self.buckets.retain(|_key, bucket| { let expired = now >= bucket.expiration; - if expired && !bucket.too_big { + if expired && !bucket.exceeds_max_merged_line_limit { emitter.emit(bucket.event.clone()); } !expired @@ -107,7 +107,7 @@ impl PartialEventMergeState { fn flush_events(&mut self, emitter: &mut Emitter) { for (_, bucket) in self.buckets.drain() { - if !bucket.too_big { + if !bucket.exceeds_max_merged_line_limit { emitter.emit(bucket.event); } } @@ -117,7 +117,7 @@ impl PartialEventMergeState { struct Bucket { event: LogEvent, expiration: Instant, - too_big: bool, + exceeds_max_merged_line_limit: bool, } pub fn merge_partial_events( @@ -229,7 +229,7 @@ mod test { } #[tokio::test] - async fn merge_single_event_legacy_too_big() { + async fn merge_single_event_legacy_exceeds_max_merged_line_limit() { let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", 1); @@ -262,7 +262,7 @@ mod test { } #[tokio::test] - async fn merge_multiple_events_legacy_too_big() { + async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() { let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", 1); e_1.insert("_partial", true); @@ -301,7 +301,7 @@ mod test { } #[tokio::test] - async fn multiple_events_flush_legacy_too_big() { + async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() { let mut e_1 = LogEvent::from("test message 1"); e_1.insert("foo", 1); e_1.insert("_partial", true); From e551a4c980093e0e04a946aaad74e41356ee5b9f Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 31 Mar 2025 16:10:27 -0400 Subject: [PATCH 14/25] Move max_merged_line_bytes from being a param to being a field on the state struct --- src/sources/kubernetes_logs/partial_events_merger.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index b6d6817865d89..94fca75740e6f 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -21,6 +21,7 @@ const EXPIRATION_TIME: Duration = Duration::from_secs(30); struct PartialEventMergeState { buckets: HashMap, + max_merged_line_bytes: usize, } impl PartialEventMergeState { @@ -30,7 +31,6 @@ impl PartialEventMergeState { file: &str, message_path: &OwnedTargetPath, expiration_time: Duration, - max_merged_line_bytes: usize, ) { let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { @@ -48,12 +48,12 @@ impl PartialEventMergeState { bytes_mut.extend_from_slice(new_value); // drop event if it's bigger than max allowed - if bytes_mut.len() > max_merged_line_bytes { + if bytes_mut.len() > self.max_merged_line_bytes { bucket.exceeds_max_merged_line_limit = true; // perf impact of clone should be minimal since being here means no further processing of this event will occur emit!(KubernetesMergedLineTooBig { event: &Value::Bytes(new_value.clone()), - configured_limit: max_merged_line_bytes, + configured_limit: self.max_merged_line_bytes, encountered_size_so_far: bytes_mut.len() }); } @@ -67,13 +67,13 @@ impl PartialEventMergeState { if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); - exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; + exceeds_max_merged_line_limit = bytes_mut.len() > self.max_merged_line_bytes; if exceeds_max_merged_line_limit { // perf impact of clone should be minimal since being here means no further processing of this event will occur emit!(KubernetesMergedLineTooBig { event: &Value::Bytes(event_bytes.clone()), - configured_limit: max_merged_line_bytes, + configured_limit: self.max_merged_line_bytes, encountered_size_so_far: bytes_mut.len() }); } @@ -156,6 +156,7 @@ fn merge_partial_events_with_custom_expiration( let state = PartialEventMergeState { buckets: HashMap::new(), + max_merged_line_bytes, }; let message_path = get_message_path(log_namespace); @@ -184,7 +185,6 @@ fn merge_partial_events_with_custom_expiration( &file, &message_path, expiration_time, - max_merged_line_bytes, ); if !is_partial { if let Some(log_event) = state.remove_event(&file) { From 7bac39a9fafbbaf81912338cd7692f828b3a3390 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 3 Apr 2025 14:31:12 -0400 Subject: [PATCH 15/25] Make new config field optional, defaulting to old behavior --- src/sources/kubernetes_logs/mod.rs | 12 ++-- .../kubernetes_logs/partial_events_merger.rs | 66 ++++++++++--------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 47945bfe34cc3..dfed33447dba9 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -201,7 +201,7 @@ pub struct Config { /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply /// if auto_partial_merge is true, so if this is set to eg 1 MiB but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. #[configurable(metadata(docs::type_unit = "bytes"))] - max_merged_line_bytes: usize, + max_merged_line_bytes: Option, /// The number of lines to read for generating the checksum. /// @@ -304,7 +304,7 @@ impl Default for Config { max_read_bytes: default_max_read_bytes(), oldest_first: default_oldest_first(), max_line_bytes: default_max_line_bytes(), - max_merged_line_bytes: default_max_merged_line_bytes(), + max_merged_line_bytes: None, fingerprint_lines: default_fingerprint_lines(), glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(), ingestion_timestamp_field: None, @@ -564,7 +564,7 @@ struct Source { max_read_bytes: usize, oldest_first: bool, max_line_bytes: usize, - max_merged_line_bytes: usize, + max_merged_line_bytes: Option, fingerprint_lines: usize, glob_minimum_cooldown: Duration, use_apiserver_cache: bool, @@ -791,7 +791,7 @@ impl Source { let mut resolved_max_line_bytes = max_line_bytes; if auto_partial_merge { - resolved_max_line_bytes = min(max_line_bytes, max_merged_line_bytes); + resolved_max_line_bytes = min(max_line_bytes, if let Some(configured_max_merged_line_bytes) = max_merged_line_bytes { configured_max_merged_line_bytes } else { max_line_bytes }); } // TODO: maybe more of the parameters have to be configurable. @@ -1044,10 +1044,6 @@ const fn default_max_line_bytes() -> usize { 32 * 1024 // 32 KiB } -const fn default_max_merged_line_bytes() -> usize { - 50 * 1024 * 1024 // 50 MiB -} - const fn default_glob_minimum_cooldown_ms() -> Duration { Duration::from_millis(60_000) } diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 94fca75740e6f..c3f6faf2e3a4c 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -21,7 +21,7 @@ const EXPIRATION_TIME: Duration = Duration::from_secs(30); struct PartialEventMergeState { buckets: HashMap, - max_merged_line_bytes: usize, + maybe_max_merged_line_bytes: Option, } impl PartialEventMergeState { @@ -48,14 +48,16 @@ impl PartialEventMergeState { bytes_mut.extend_from_slice(new_value); // drop event if it's bigger than max allowed - if bytes_mut.len() > self.max_merged_line_bytes { - bucket.exceeds_max_merged_line_limit = true; - // perf impact of clone should be minimal since being here means no further processing of this event will occur - emit!(KubernetesMergedLineTooBig { - event: &Value::Bytes(new_value.clone()), - configured_limit: self.max_merged_line_bytes, - encountered_size_so_far: bytes_mut.len() - }); + if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes { + if bytes_mut.len() > max_merged_line_bytes { + bucket.exceeds_max_merged_line_limit = true; + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBig { + event: &Value::Bytes(new_value.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); + } } *prev_value = bytes_mut.freeze(); @@ -67,15 +69,17 @@ impl PartialEventMergeState { if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); - exceeds_max_merged_line_limit = bytes_mut.len() > self.max_merged_line_bytes; - - if exceeds_max_merged_line_limit { - // perf impact of clone should be minimal since being here means no further processing of this event will occur - emit!(KubernetesMergedLineTooBig { - event: &Value::Bytes(event_bytes.clone()), - configured_limit: self.max_merged_line_bytes, - encountered_size_so_far: bytes_mut.len() - }); + if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes { + exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; + + if exceeds_max_merged_line_limit { + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBig { + event: &Value::Bytes(event_bytes.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); + } } } @@ -123,13 +127,13 @@ struct Bucket { pub fn merge_partial_events( stream: impl Stream + 'static, log_namespace: LogNamespace, - max_merged_line_bytes: usize, + maybe_max_merged_line_bytes: Option, ) -> impl Stream { merge_partial_events_with_custom_expiration( stream, log_namespace, EXPIRATION_TIME, - max_merged_line_bytes, + maybe_max_merged_line_bytes, ) } @@ -138,7 +142,7 @@ fn merge_partial_events_with_custom_expiration( stream: impl Stream + 'static, log_namespace: LogNamespace, expiration_time: Duration, - max_merged_line_bytes: usize, + maybe_max_merged_line_bytes: Option, ) -> impl Stream { let partial_flag_path = match log_namespace { LogNamespace::Vector => { @@ -156,7 +160,7 @@ fn merge_partial_events_with_custom_expiration( let state = PartialEventMergeState { buckets: HashMap::new(), - max_merged_line_bytes, + maybe_max_merged_line_bytes, }; let message_path = get_message_path(log_namespace); @@ -218,7 +222,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into()]); let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); + merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -234,7 +238,7 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 1); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1)); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 0); @@ -251,7 +255,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); + merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -272,7 +276,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); // 24 > length of first message but less than the two combined - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 24); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24)); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 0); @@ -290,7 +294,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); + merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -312,7 +316,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); // 24 > length of first message but less than the two combined - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 24); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24)); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 0); @@ -336,7 +340,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), - 50 * 1024 * 1024, + None, ); let output: Vec = output_stream.take(2).collect().await; @@ -361,7 +365,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into()]); let output_stream = - merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); + merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -392,7 +396,7 @@ mod test { let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); let output_stream = - merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); + merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 4975ebef49ec2d51722dd75af3183cee9915d661 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 3 Apr 2025 14:47:25 -0400 Subject: [PATCH 16/25] Format --- src/sources/kubernetes_logs/mod.rs | 9 +++++- .../kubernetes_logs/partial_events_merger.rs | 29 +++++++------------ 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index dfed33447dba9..d8c02704774e5 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -791,7 +791,14 @@ impl Source { let mut resolved_max_line_bytes = max_line_bytes; if auto_partial_merge { - resolved_max_line_bytes = min(max_line_bytes, if let Some(configured_max_merged_line_bytes) = max_merged_line_bytes { configured_max_merged_line_bytes } else { max_line_bytes }); + resolved_max_line_bytes = min( + max_line_bytes, + if let Some(configured_max_merged_line_bytes) = max_merged_line_bytes { + configured_max_merged_line_bytes + } else { + max_line_bytes + }, + ); } // TODO: maybe more of the parameters have to be configurable. diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index c3f6faf2e3a4c..d5951f6f73284 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -70,7 +70,7 @@ impl PartialEventMergeState { if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes { - exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; + exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; if exceeds_max_merged_line_limit { // perf impact of clone should be minimal since being here means no further processing of this event will occur @@ -95,7 +95,10 @@ impl PartialEventMergeState { } fn remove_event(&mut self, file: &str) -> Option { - self.buckets.remove(file).filter(|bucket| !bucket.exceeds_max_merged_line_limit).map(|bucket| bucket.event) + self.buckets + .remove(file) + .filter(|bucket| !bucket.exceeds_max_merged_line_limit) + .map(|bucket| bucket.event) } fn emit_expired_events(&mut self, emitter: &mut Emitter) { @@ -184,12 +187,7 @@ fn merge_partial_events_with_custom_expiration( .map(|x| x.to_string()) .unwrap_or_default(); - state.add_event( - event, - &file, - &message_path, - expiration_time, - ); + state.add_event(event, &file, &message_path, expiration_time); if !is_partial { if let Some(log_event) = state.remove_event(&file) { emitter.emit(log_event); @@ -221,8 +219,7 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, None); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -254,8 +251,7 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, None); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -293,8 +289,7 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = - merge_partial_events(input_stream, LogNamespace::Legacy, None); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -364,8 +359,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = - merge_partial_events(input_stream, LogNamespace::Vector, None); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -395,8 +389,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = - merge_partial_events(input_stream, LogNamespace::Vector, None); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 7ca79d07d34decf7337a7b1c80a4573eb07949bc Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 10 Apr 2025 15:25:37 -0400 Subject: [PATCH 17/25] Appease check-events --- src/internal_events/file.rs | 21 +++++++++++++------ src/internal_events/kubernetes_logs.rs | 20 ++++++++++++------ .../kubernetes_logs/partial_events_merger.rs | 6 +++--- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index acc8212940c5f..1a12b3f0ba20c 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -109,6 +109,7 @@ mod source { use bytes::BytesMut; use metrics::counter; use vector_lib::file_source::FileSourceInternalEvents; + use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL}; use super::{FileOpen, InternalEvent}; use vector_lib::emit; @@ -498,26 +499,34 @@ mod source { } #[derive(Debug)] - pub struct FileLineTooBig<'a> { + pub struct FileLineTooBigError<'a> { pub bytes: &'a BytesMut, pub configured_limit: usize, pub encountered_size_so_far: usize, } - impl InternalEvent for FileLineTooBig<'_> { + impl InternalEvent for FileLineTooBigError<'_> { fn emit(self) { - warn!( + error!( message = "Found line that exceeds max_line_bytes; discarding.", bytes = ?self.bytes, configured_limit = self.configured_limit, encountered_size_so_far = self.encountered_size_so_far, internal_log_rate_limit = true, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::RECEIVING, ); counter!( - "component_discarded_events_total", - "intentional" => "true", + "component_errors_total", + "error_code" => "reading_line_from_file", + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::RECEIVING, ) .increment(1); + emit!(ComponentEventsDropped:: { + count: 1, + reason: "Found line that exceeds max_line_bytes; discarding.", + }); } } @@ -610,7 +619,7 @@ mod source { configured_limit: usize, encountered_size_so_far: usize, ) { - emit!(FileLineTooBig { + emit!(FileLineTooBigError { bytes, configured_limit, encountered_size_so_far diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index 512ce7e3bcf0c..39d27649cef5e 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -1,5 +1,5 @@ use metrics::counter; -use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{InternalEvent, INTENTIONAL}; use vector_lib::{ internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}, json_size::JsonSize, @@ -208,25 +208,33 @@ impl InternalEvent for KubernetesLifecycleError { } #[derive(Debug)] -pub struct KubernetesMergedLineTooBig<'a> { +pub struct KubernetesMergedLineTooBigError<'a> { pub event: &'a Value, pub configured_limit: usize, pub encountered_size_so_far: usize, } -impl InternalEvent for KubernetesMergedLineTooBig<'_> { +impl InternalEvent for KubernetesMergedLineTooBigError<'_> { fn emit(self) { - warn!( + error!( message = "Found line that exceeds max_merged_line_bytes; discarding.", event = ?self.event, configured_limit = self.configured_limit, encountered_size_so_far = self.encountered_size_so_far, internal_log_rate_limit = true, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::RECEIVING, ); counter!( - "component_discarded_events_total", - "intentional" => "true", + "component_errors_total", + "error_code" => "reading_line_from_kubernetes_log", + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::RECEIVING, ) .increment(1); + emit!(ComponentEventsDropped:: { + count: 1, + reason: "Found line that exceeds max_merged_line_bytes; discarding.", + }); } } diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index d5951f6f73284..56752b49454cd 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -11,7 +11,7 @@ use vrl::owned_value_path; use crate::event; use crate::event::{Event, LogEvent, Value}; -use crate::internal_events::KubernetesMergedLineTooBig; +use crate::internal_events::KubernetesMergedLineTooBigError; use crate::sources::kubernetes_logs::transform_utils::get_message_path; /// The key we use for `file` field. @@ -52,7 +52,7 @@ impl PartialEventMergeState { if bytes_mut.len() > max_merged_line_bytes { bucket.exceeds_max_merged_line_limit = true; // perf impact of clone should be minimal since being here means no further processing of this event will occur - emit!(KubernetesMergedLineTooBig { + emit!(KubernetesMergedLineTooBigError { event: &Value::Bytes(new_value.clone()), configured_limit: max_merged_line_bytes, encountered_size_so_far: bytes_mut.len() @@ -74,7 +74,7 @@ impl PartialEventMergeState { if exceeds_max_merged_line_limit { // perf impact of clone should be minimal since being here means no further processing of this event will occur - emit!(KubernetesMergedLineTooBig { + emit!(KubernetesMergedLineTooBigError { event: &Value::Bytes(event_bytes.clone()), configured_limit: max_merged_line_bytes, encountered_size_so_far: bytes_mut.len() From c9919b42081cee27c890e4acb1a037f4930dda7c Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 28 Apr 2025 14:01:30 -0400 Subject: [PATCH 18/25] docs regen --- .../components/sources/base/kubernetes_logs.cue | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/website/cue/reference/components/sources/base/kubernetes_logs.cue b/website/cue/reference/components/sources/base/kubernetes_logs.cue index bed681ab70e3e..c933c8389cffe 100644 --- a/website/cue/reference/components/sources/base/kubernetes_logs.cue +++ b/website/cue/reference/components/sources/base/kubernetes_logs.cue @@ -191,6 +191,19 @@ base: components: sources: kubernetes_logs: configuration: { unit: "bytes" } } + max_merged_line_bytes: { + description: """ + The maximum number of bytes a line can contain - after merging - before being discarded. + + This protects against malformed lines or tailing incorrect files. + + Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this + config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply + if auto_partial_merge is true, so if this is set to eg 1 MiB but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + """ + required: false + type: uint: unit: "bytes" + } max_read_bytes: { description: """ Max amount of bytes to read from a single file before switching over to the next file. From 606292717576165f61a48b63ee671bf86fd980c0 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 14 May 2025 14:58:34 -0400 Subject: [PATCH 19/25] Tweak wording of doc; emit only first 1k bytes of dropped lines in error --- lib/file-source/src/buffer.rs | 17 ++++++++++------- lib/file-source/src/file_server.rs | 4 ++-- lib/file-source/src/file_watcher/mod.rs | 14 +++++++------- lib/file-source/src/internal_events.rs | 2 +- src/sources/kubernetes_logs/mod.rs | 2 +- .../components/sources/base/kubernetes_logs.cue | 2 +- 6 files changed, 22 insertions(+), 19 deletions(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index bfdb860b0ddbf..5ff639ad8d9ea 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -7,7 +7,7 @@ use crate::FilePosition; pub struct ReadResult { pub successfully_read: Option, - pub discarded_for_size: Vec, + pub discarded_for_size_and_truncated: Vec, } /// Read up to `max_size` bytes from `reader`, splitting by `delim` @@ -44,7 +44,7 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( let mut discarding = false; let delim_finder = Finder::new(delim); let delim_len = delim.len(); - let mut discarded_for_size = Vec::new(); + let mut discarded_for_size_and_truncated = Vec::new(); loop { let available: &[u8] = match reader.fill_buf() { Ok(n) => n, @@ -73,7 +73,10 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( total_read += used; if !discarding && buf.len() > max_size { - discarded_for_size.push(buf.clone()); + // keep only the first 1k bytes to make sure we can actually emit a usable error + let mut to_truncate = buf.clone(); + to_truncate.truncate(1000); + discarded_for_size_and_truncated.push(to_truncate); discarding = true; } @@ -81,7 +84,7 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( if !discarding { return Ok(ReadResult { successfully_read: Some(total_read), - discarded_for_size, + discarded_for_size_and_truncated, }); } else { discarding = false; @@ -94,7 +97,7 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( // FileWatcher. return Ok(ReadResult { successfully_read: None, - discarded_for_size, + discarded_for_size_and_truncated, }); } } @@ -193,7 +196,7 @@ mod test { { ReadResult { successfully_read: None, - discarded_for_size: _, + discarded_for_size_and_truncated: _, } => { // Subject only returns None if this is the last chunk _and_ // the chunk did not contain a delimiter _or_ the delimiter @@ -205,7 +208,7 @@ mod test { } ReadResult { successfully_read: Some(total_read), - discarded_for_size: _, + discarded_for_size_and_truncated: _, } => { // Now that the function has returned we confirm that the // returned details match our `first_delim` and also that diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index c682e44a13e53..ebb1867339242 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -265,10 +265,10 @@ where let mut bytes_read: usize = 0; while let Ok(RawLineResult { raw_line: Some(line), - discarded_for_size, + discarded_for_size_and_truncated, }) = watcher.read_line() { - discarded_for_size.iter().for_each(|buf| { + discarded_for_size_and_truncated.iter().for_each(|buf| { self.emitter.emit_file_line_too_long( &buf.clone(), self.max_line_bytes, diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index 662374ad3c8e4..a8d4fd0f81641 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -33,7 +33,7 @@ pub(super) struct RawLine { #[derive(Debug)] pub struct RawLineResult { pub raw_line: Option, - pub discarded_for_size: Vec, + pub discarded_for_size_and_truncated: Vec, } /// The `FileWatcher` struct defines the polling based state machine which reads @@ -230,7 +230,7 @@ impl FileWatcher { ) { Ok(ReadResult { successfully_read: Some(_), - discarded_for_size, + discarded_for_size_and_truncated, }) => { self.track_read_success(); Ok(RawLineResult { @@ -238,12 +238,12 @@ impl FileWatcher { offset: initial_position, bytes: self.buf.split().freeze(), }), - discarded_for_size, + discarded_for_size_and_truncated, }) } Ok(ReadResult { successfully_read: None, - discarded_for_size, + discarded_for_size_and_truncated, }) => { if !self.file_findable() { self.set_dead(); @@ -256,7 +256,7 @@ impl FileWatcher { self.reached_eof = true; Ok(RawLineResult { raw_line: None, - discarded_for_size, + discarded_for_size_and_truncated, }) } else { Ok(RawLineResult { @@ -264,14 +264,14 @@ impl FileWatcher { offset: initial_position, bytes: buf, }), - discarded_for_size, + discarded_for_size_and_truncated, }) } } else { self.reached_eof = true; Ok(RawLineResult { raw_line: None, - discarded_for_size, + discarded_for_size_and_truncated, }) } } diff --git a/lib/file-source/src/internal_events.rs b/lib/file-source/src/internal_events.rs index 726ec09f29999..3077a51d8cab9 100644 --- a/lib/file-source/src/internal_events.rs +++ b/lib/file-source/src/internal_events.rs @@ -31,7 +31,7 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { fn emit_file_line_too_long( &self, - bytes: &BytesMut, + truncated_bytes: &BytesMut, configured_limit: usize, encountered_size_so_far: usize, ); diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index d8c02704774e5..49129b1999d76 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -199,7 +199,7 @@ pub struct Config { /// /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply - /// if auto_partial_merge is true, so if this is set to eg 1 MiB but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + /// if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. #[configurable(metadata(docs::type_unit = "bytes"))] max_merged_line_bytes: Option, diff --git a/website/cue/reference/components/sources/base/kubernetes_logs.cue b/website/cue/reference/components/sources/base/kubernetes_logs.cue index c933c8389cffe..6685ac47ce701 100644 --- a/website/cue/reference/components/sources/base/kubernetes_logs.cue +++ b/website/cue/reference/components/sources/base/kubernetes_logs.cue @@ -199,7 +199,7 @@ base: components: sources: kubernetes_logs: configuration: { Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply - if auto_partial_merge is true, so if this is set to eg 1 MiB but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. """ required: false type: uint: unit: "bytes" From 3b8edf6a2e23eecc3521db11a44d5e074600c733 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 14 May 2025 15:19:18 -0400 Subject: [PATCH 20/25] Rename fields for clarity --- src/internal_events/file.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 1a12b3f0ba20c..4697c93c89ca7 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -500,7 +500,7 @@ mod source { #[derive(Debug)] pub struct FileLineTooBigError<'a> { - pub bytes: &'a BytesMut, + pub truncated_bytes: &'a BytesMut, pub configured_limit: usize, pub encountered_size_so_far: usize, } @@ -509,7 +509,7 @@ mod source { fn emit(self) { error!( message = "Found line that exceeds max_line_bytes; discarding.", - bytes = ?self.bytes, + truncated_bytes = ?self.truncated_bytes, configured_limit = self.configured_limit, encountered_size_so_far = self.encountered_size_so_far, internal_log_rate_limit = true, @@ -615,12 +615,12 @@ mod source { fn emit_file_line_too_long( &self, - bytes: &bytes::BytesMut, + truncated_bytes: &bytes::BytesMut, configured_limit: usize, encountered_size_so_far: usize, ) { emit!(FileLineTooBigError { - bytes, + truncated_bytes, configured_limit, encountered_size_so_far }); From 08b72923a1070eac403550053aca1da8e8ed7ec9 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 11 Jun 2025 15:29:04 -0400 Subject: [PATCH 21/25] Per PR feedback: copy just the initial 1000 bytes rather than cloning the whole buffer and then truncating, use more idiomatic Rust for handling both configured and unconfigured cases of max_merged_line_bytes --- lib/file-source/src/buffer.rs | 6 +++--- src/sources/kubernetes_logs/mod.rs | 6 +----- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index 5ff639ad8d9ea..3b9b26bd33b91 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -74,9 +74,9 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( if !discarding && buf.len() > max_size { // keep only the first 1k bytes to make sure we can actually emit a usable error - let mut to_truncate = buf.clone(); - to_truncate.truncate(1000); - discarded_for_size_and_truncated.push(to_truncate); + let mut truncated: BytesMut = BytesMut::with_capacity(1000); + truncated.copy_from_slice(&buf[0..1000]); + discarded_for_size_and_truncated.push(truncated); discarding = true; } diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 49129b1999d76..4db231f8b3b19 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -793,11 +793,7 @@ impl Source { if auto_partial_merge { resolved_max_line_bytes = min( max_line_bytes, - if let Some(configured_max_merged_line_bytes) = max_merged_line_bytes { - configured_max_merged_line_bytes - } else { - max_line_bytes - }, + max_merged_line_bytes.unwrap_or(max_line_bytes), ); } From 335f6e2871d5aa1e7d851b28cb1eb550812d66d8 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 11 Jun 2025 15:41:53 -0400 Subject: [PATCH 22/25] Allow spelling of already-merged changelog filename --- .github/actions/spelling/expect.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index ee745e5c30d5e..82fea89d4fb57 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -175,6 +175,7 @@ efgh Elhage emerg Enableable +enableable endianess endler eni From e4efae0050fa046982eef1bba44f8c43c7e93025 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 11 Jun 2025 16:22:06 -0400 Subject: [PATCH 23/25] Don't try to include more characters than there actually are in the slice --- lib/file-source/src/buffer.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index 3b9b26bd33b91..8401fe1f43c22 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -1,4 +1,4 @@ -use std::io::{self, BufRead}; +use std::{cmp::min, io::{self, BufRead}}; use bstr::Finder; use bytes::BytesMut; @@ -73,9 +73,10 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( total_read += used; if !discarding && buf.len() > max_size { - // keep only the first 1k bytes to make sure we can actually emit a usable error - let mut truncated: BytesMut = BytesMut::with_capacity(1000); - truncated.copy_from_slice(&buf[0..1000]); + // keep only the first <1k bytes to make sure we can actually emit a usable error + let length_to_keep = min(1000, max_size); + let mut truncated: BytesMut = BytesMut::with_capacity(length_to_keep); + truncated.copy_from_slice(&buf[0..length_to_keep]); discarded_for_size_and_truncated.push(truncated); discarding = true; } From 36b4c77d1bee17510a544659111ee80e48b3d23f Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Wed, 11 Jun 2025 16:33:04 -0400 Subject: [PATCH 24/25] Don't just get enough capacity, make sure length matches too --- lib/file-source/src/buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index 8401fe1f43c22..4923fc0659bec 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -75,7 +75,7 @@ pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( if !discarding && buf.len() > max_size { // keep only the first <1k bytes to make sure we can actually emit a usable error let length_to_keep = min(1000, max_size); - let mut truncated: BytesMut = BytesMut::with_capacity(length_to_keep); + let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep); truncated.copy_from_slice(&buf[0..length_to_keep]); discarded_for_size_and_truncated.push(truncated); discarding = true; From ffcfcc3270fda06f631e078a2f20911b1e9cfd86 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 12 Jun 2025 16:13:49 -0400 Subject: [PATCH 25/25] Formatting --- lib/file-source/src/buffer.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index 4923fc0659bec..55dd481334e1d 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -1,4 +1,7 @@ -use std::{cmp::min, io::{self, BufRead}}; +use std::{ + cmp::min, + io::{self, BufRead}, +}; use bstr::Finder; use bytes::BytesMut;