From 498c2041aaf91ebea17f1eb3fd00e2cc53aa2554 Mon Sep 17 00:00:00 2001 From: Vitalii Parfonov Date: Tue, 22 Jul 2025 15:11:21 +0300 Subject: [PATCH] Allow specification of a maximum line size to be applied after merging Signed-off-by: Vitalii Parfonov --- Cargo.lock | 1 - .../22581_max_merged_line_bytes.feature.md | 3 + lib/file-source/src/buffer.rs | 55 +++++--- lib/file-source/src/file_server.rs | 16 ++- lib/file-source/src/file_watcher/mod.rs | 54 ++++++-- .../src/file_watcher/tests/experiment.rs | 9 +- .../tests/experiment_no_truncations.rs | 14 +- lib/file-source/src/fingerprinter.rs | 3 + lib/file-source/src/internal_events.rs | 9 ++ src/internal_events/file.rs | 47 +++++++ src/internal_events/kubernetes_logs.rs | 35 ++++- src/sources/kubernetes_logs/mod.rs | 30 ++++- .../kubernetes_logs/partial_events_merger.rs | 123 ++++++++++++++++-- .../sources/base/kubernetes_logs.cue | 13 ++ 14 files changed, 357 insertions(+), 55 deletions(-) create mode 100644 changelog.d/22581_max_merged_line_bytes.feature.md diff --git a/Cargo.lock b/Cargo.lock index c2d9d280d0646..8069a5369abf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6974,7 +6974,6 @@ checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" dependencies = [ "cc", "libc", - "openssl-src", "pkg-config", "vcpkg", ] diff --git a/changelog.d/22581_max_merged_line_bytes.feature.md b/changelog.d/22581_max_merged_line_bytes.feature.md new file mode 100644 index 0000000000000..b5d36e53bde52 --- /dev/null +++ b/changelog.d/22581_max_merged_line_bytes.feature.md @@ -0,0 +1,3 @@ +The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes. + +authors: ganelo diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index c8dbe1f400905..55dd481334e1d 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -1,11 +1,18 @@ -use std::io::{self, BufRead}; +use std::{ + cmp::min, + io::{self, BufRead}, +}; use bstr::Finder; use bytes::BytesMut; -use tracing::warn; use crate::FilePosition; +pub struct ReadResult { + pub successfully_read: Option, + pub discarded_for_size_and_truncated: Vec, +} + /// Read up to `max_size` bytes from `reader`, splitting by `delim` /// /// The function reads up to `max_size` bytes from `reader`, splitting the input @@ -29,17 +36,18 @@ use crate::FilePosition; /// Benchmarks indicate that this function processes in the high single-digit /// GiB/s range for buffers of length 1KiB. For buffers any smaller than this /// the overhead of setup dominates our benchmarks. -pub fn read_until_with_max_size( - reader: &mut R, - position: &mut FilePosition, - delim: &[u8], - buf: &mut BytesMut, +pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( + reader: &'a mut R, + position: &'a mut FilePosition, + delim: &'a [u8], + buf: &'a mut BytesMut, max_size: usize, -) -> io::Result> { +) -> io::Result { let mut total_read = 0; let mut discarding = false; let delim_finder = Finder::new(delim); let delim_len = delim.len(); + let mut discarded_for_size_and_truncated = Vec::new(); loop { let available: &[u8] = match reader.fill_buf() { Ok(n) => n, @@ -68,16 +76,20 @@ pub fn read_until_with_max_size( total_read += used; if !discarding && buf.len() > max_size { - warn!( - message = "Found line that exceeds max_line_bytes; discarding.", - internal_log_rate_limit = true - ); + // keep only the first <1k bytes to make sure we can actually emit a usable error + let length_to_keep = min(1000, max_size); + let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep); + truncated.copy_from_slice(&buf[0..length_to_keep]); + discarded_for_size_and_truncated.push(truncated); discarding = true; } if done { if !discarding { - return Ok(Some(total_read)); + return Ok(ReadResult { + successfully_read: Some(total_read), + discarded_for_size_and_truncated, + }); } else { discarding = false; buf.clear(); @@ -87,7 +99,10 @@ pub fn read_until_with_max_size( // us to observe an incomplete write. We return None here and let the loop continue // next time the method is called. This is safe because the buffer is specific to this // FileWatcher. - return Ok(None); + return Ok(ReadResult { + successfully_read: None, + discarded_for_size_and_truncated, + }); } } } @@ -99,6 +114,8 @@ mod test { use bytes::{BufMut, BytesMut}; use quickcheck::{QuickCheck, TestResult}; + use crate::buffer::ReadResult; + use super::read_until_with_max_size; fn qc_inner(chunks: Vec>, delim: u8, max_size: NonZeroU8) -> TestResult { @@ -181,7 +198,10 @@ mod test { ) .unwrap() { - None => { + ReadResult { + successfully_read: None, + discarded_for_size_and_truncated: _, + } => { // Subject only returns None if this is the last chunk _and_ // the chunk did not contain a delimiter _or_ the delimiter // was outside the max_size range _or_ the current chunk is empty. @@ -190,7 +210,10 @@ mod test { .any(|details| ((details.chunk_index == idx) && details.within_max_size)); assert!(chunk.is_empty() || !has_valid_delimiter) } - Some(total_read) => { + ReadResult { + successfully_read: Some(total_read), + discarded_for_size_and_truncated: _, + } => { // Now that the function has returned we confirm that the // returned details match our `first_delim` and also that // the `buffer` is populated correctly. diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 1dfc6ebd08004..ebb1867339242 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace}; use crate::{ checkpointer::{Checkpointer, CheckpointsView}, - file_watcher::FileWatcher, + file_watcher::{FileWatcher, RawLineResult}, fingerprinter::{FileFingerprint, Fingerprinter}, paths_provider::PathsProvider, FileSourceInternalEvents, ReadFrom, @@ -263,7 +263,19 @@ where let start = time::Instant::now(); let mut bytes_read: usize = 0; - while let Ok(Some(line)) = watcher.read_line() { + while let Ok(RawLineResult { + raw_line: Some(line), + discarded_for_size_and_truncated, + }) = watcher.read_line() + { + discarded_for_size_and_truncated.iter().for_each(|buf| { + self.emitter.emit_file_line_too_long( + &buf.clone(), + self.max_line_bytes, + buf.len(), + ) + }); + let sz = line.bytes.len(); trace!( message = "Read bytes.", diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index 80db2b9bc876c..a8d4fd0f81641 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -12,7 +12,9 @@ use tracing::debug; use vector_common::constants::GZIP_MAGIC; use crate::{ - buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom, + buffer::{read_until_with_max_size, ReadResult}, + metadata_ext::PortableFileExt, + FilePosition, ReadFrom, }; #[cfg(test)] mod tests; @@ -28,6 +30,12 @@ pub(super) struct RawLine { pub bytes: Bytes, } +#[derive(Debug)] +pub struct RawLineResult { + pub raw_line: Option, + pub discarded_for_size_and_truncated: Vec, +} + /// The `FileWatcher` struct defines the polling based state machine which reads /// from a file path, transparently updating the underlying file descriptor when /// the file has been rolled over, as is common for logs. @@ -207,7 +215,7 @@ impl FileWatcher { /// This function will attempt to read a new line from its file, blocking, /// up to some maximum but unspecified amount of time. `read_line` will open /// a new file handler as needed, transparently to the caller. - pub(super) fn read_line(&mut self) -> io::Result> { + pub(super) fn read_line(&mut self) -> io::Result { self.track_read_attempt(); let reader = &mut self.reader; @@ -220,14 +228,23 @@ impl FileWatcher { &mut self.buf, self.max_line_bytes, ) { - Ok(Some(_)) => { + Ok(ReadResult { + successfully_read: Some(_), + discarded_for_size_and_truncated, + }) => { self.track_read_success(); - Ok(Some(RawLine { - offset: initial_position, - bytes: self.buf.split().freeze(), - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: self.buf.split().freeze(), + }), + discarded_for_size_and_truncated, + }) } - Ok(None) => { + Ok(ReadResult { + successfully_read: None, + discarded_for_size_and_truncated, + }) => { if !self.file_findable() { self.set_dead(); // File has been deleted, so return what we have in the buffer, even though it @@ -237,16 +254,25 @@ impl FileWatcher { if buf.is_empty() { // EOF self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size_and_truncated, + }) } else { - Ok(Some(RawLine { - offset: initial_position, - bytes: buf, - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: buf, + }), + discarded_for_size_and_truncated, + }) } } else { self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size_and_truncated, + }) } } Err(e) => { diff --git a/lib/file-source/src/file_watcher/tests/experiment.rs b/lib/file-source/src/file_watcher/tests/experiment.rs index decdbdab98240..cbbfabe0a67b7 100644 --- a/lib/file-source/src/file_watcher/tests/experiment.rs +++ b/lib/file-source/src/file_watcher/tests/experiment.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -96,11 +96,14 @@ fn experiment(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; continue; } diff --git a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs index ee8a24a9f95bf..78aa7536b9a38 100644 --- a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs +++ b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(Some(line)) => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) => { let exp = fwfiles[read_index].read_line().expect("could not readline"); assert_eq!(exp.into_bytes(), line.bytes); // assert_eq!(sz, buf.len() + 1); diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index f94cdc3a2ce9b..096e849059c21 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -392,6 +392,7 @@ mod test { time::Duration, }; + use bytes::BytesMut; use flate2::write::GzEncoder; use tempfile::{tempdir, TempDir}; @@ -815,5 +816,7 @@ mod test { fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {} fn emit_gave_up_on_deleted_file(&self, _: &Path) {} + + fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {} } } diff --git a/lib/file-source/src/internal_events.rs b/lib/file-source/src/internal_events.rs index 80dbd7ac407ae..e2ab8fb5ec391 100644 --- a/lib/file-source/src/internal_events.rs +++ b/lib/file-source/src/internal_events.rs @@ -1,5 +1,7 @@ use std::{io::Error, path::Path, time::Duration}; +use bytes::BytesMut; + /// Every internal event in this crate has a corresponding /// method in this trait which should emit the event. pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { @@ -28,4 +30,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { fn emit_path_globbing_failed(&self, path: &Path, error: &Error); fn emit_gave_up_on_deleted_file(&self, path: &Path); + + fn emit_file_line_too_long( + &self, + truncated_bytes: &BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ); } diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 3ac85f71856b6..a1c83584470ed 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -106,8 +106,10 @@ impl InternalEvent for FileIoError<'_, P> { mod source { use std::{io::Error, path::Path, time::Duration}; + use bytes::BytesMut; use metrics::counter; use vector_lib::file_source::FileSourceInternalEvents; + use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL}; use super::{FileOpen, InternalEvent}; use vector_lib::emit; @@ -496,6 +498,38 @@ mod source { } } + #[derive(Debug)] + pub struct FileLineTooBigError<'a> { + pub truncated_bytes: &'a BytesMut, + pub configured_limit: usize, + pub encountered_size_so_far: usize, + } + + impl InternalEvent for FileLineTooBigError<'_> { + fn emit(self) { + error!( + message = "Found line that exceeds max_line_bytes; discarding.", + truncated_bytes = ?self.truncated_bytes, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::RECEIVING, + ); + counter!( + "component_errors_total", + "error_code" => "reading_line_from_file", + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::RECEIVING, + ) + .increment(1); + emit!(ComponentEventsDropped:: { + count: 1, + reason: "Found line that exceeds max_line_bytes; discarding.", + }); + } + } + #[derive(Debug)] pub struct GaveUpOnDeletedFile<'a> { pub file: &'a Path, @@ -602,5 +636,18 @@ mod source { fn emit_gave_up_on_deleted_file(&self, file: &Path) { emit!(GaveUpOnDeletedFile { file }); } + + fn emit_file_line_too_long( + &self, + truncated_bytes: &bytes::BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ) { + emit!(FileLineTooBigError { + truncated_bytes, + configured_limit, + encountered_size_so_far + }); + } } } diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index 86d5c74af4ec5..06b0fb6bf577b 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -1,9 +1,10 @@ use metrics::counter; -use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{InternalEvent, INTENTIONAL}; use vector_lib::{ internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}, json_size::JsonSize, }; +use vrl::core::Value; use crate::event::Event; @@ -209,3 +210,35 @@ impl InternalEvent for KubernetesLifecycleError { }); } } + +#[derive(Debug)] +pub struct KubernetesMergedLineTooBigError<'a> { + pub event: &'a Value, + pub configured_limit: usize, + pub encountered_size_so_far: usize, +} + +impl InternalEvent for KubernetesMergedLineTooBigError<'_> { + fn emit(self) { + error!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + event = ?self.event, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::RECEIVING, + ); + counter!( + "component_errors_total", + "error_code" => "reading_line_from_kubernetes_log", + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::RECEIVING, + ) + .increment(1); + emit!(ComponentEventsDropped:: { + count: 1, + reason: "Found line that exceeds max_merged_line_bytes; discarding.", + }); + } +} diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index c2c9216176dce..df201aff95348 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -4,7 +4,7 @@ //! running inside the cluster as a DaemonSet. #![deny(missing_docs)] -use std::{path::PathBuf, time::Duration}; +use std::{cmp::min, path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; @@ -193,6 +193,16 @@ pub struct Config { #[configurable(metadata(docs::type_unit = "bytes"))] max_line_bytes: usize, + /// The maximum number of bytes a line can contain - after merging - before being discarded. + /// + /// This protects against malformed lines or tailing incorrect files. + /// + /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this + /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply + /// if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + #[configurable(metadata(docs::type_unit = "bytes"))] + max_merged_line_bytes: Option, + /// The number of lines to read for generating the checksum. /// /// If your files share a common header that is not always a fixed size, @@ -294,6 +304,7 @@ impl Default for Config { max_read_bytes: default_max_read_bytes(), oldest_first: default_oldest_first(), max_line_bytes: default_max_line_bytes(), + max_merged_line_bytes: None, fingerprint_lines: default_fingerprint_lines(), glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(), ingestion_timestamp_field: None, @@ -553,6 +564,7 @@ struct Source { max_read_bytes: usize, oldest_first: bool, max_line_bytes: usize, + max_merged_line_bytes: Option, fingerprint_lines: usize, glob_minimum_cooldown: Duration, use_apiserver_cache: bool, @@ -641,6 +653,7 @@ impl Source { max_read_bytes: config.max_read_bytes, oldest_first: config.oldest_first, max_line_bytes: config.max_line_bytes, + max_merged_line_bytes: config.max_merged_line_bytes, fingerprint_lines: config.fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache: config.use_apiserver_cache, @@ -676,6 +689,7 @@ impl Source { max_read_bytes, oldest_first, max_line_bytes, + max_merged_line_bytes, fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache, @@ -779,6 +793,14 @@ impl Source { let ignore_before = calculate_ignore_before(ignore_older_secs); + let mut resolved_max_line_bytes = max_line_bytes; + if auto_partial_merge { + resolved_max_line_bytes = min( + max_line_bytes, + max_merged_line_bytes.unwrap_or(max_line_bytes), + ); + } + // TODO: maybe more of the parameters have to be configurable. let checkpointer = Checkpointer::new(&data_dir); @@ -803,7 +825,7 @@ impl Source { ignore_before, // The maximum number of bytes a line can contain before being discarded. This // protects against malformed lines or tailing incorrect files. - max_line_bytes, + max_line_bytes: resolved_max_line_bytes, // Delimiter bytes that is used to read the file line-by-line line_delimiter: Bytes::from("\n"), // The directory where to keep the checkpoints. @@ -821,7 +843,7 @@ impl Source { ignored_header_bytes: 0, lines: fingerprint_lines, }, - max_line_length: max_line_bytes, + max_line_length: resolved_max_line_bytes, ignore_not_found: true, }, oldest_first, @@ -898,7 +920,7 @@ impl Source { let (events_count, _) = events.size_hint(); let mut stream = if auto_partial_merge { - merge_partial_events(events, log_namespace).left_stream() + merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream() } else { events.right_stream() }; diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 14628df46ad6e..56752b49454cd 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -11,6 +11,7 @@ use vrl::owned_value_path; use crate::event; use crate::event::{Event, LogEvent, Value}; +use crate::internal_events::KubernetesMergedLineTooBigError; use crate::sources::kubernetes_logs::transform_utils::get_message_path; /// The key we use for `file` field. @@ -20,6 +21,7 @@ const EXPIRATION_TIME: Duration = Duration::from_secs(30); struct PartialEventMergeState { buckets: HashMap, + maybe_max_merged_line_bytes: Option, } impl PartialEventMergeState { @@ -30,38 +32,80 @@ impl PartialEventMergeState { message_path: &OwnedTargetPath, expiration_time: Duration, ) { + let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { + // don't bother continuing to process new partial events that match existing ones that are already too big + if bucket.exceeds_max_merged_line_limit { + return; + } + // merging with existing event if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) = (bucket.event.get_mut(message_path), event.get(message_path)) { - let mut bytes_mut = BytesMut::new(); bytes_mut.extend_from_slice(prev_value); bytes_mut.extend_from_slice(new_value); + + // drop event if it's bigger than max allowed + if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes { + if bytes_mut.len() > max_merged_line_bytes { + bucket.exceeds_max_merged_line_limit = true; + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBigError { + event: &Value::Bytes(new_value.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); + } + } + *prev_value = bytes_mut.freeze(); } } else { // new event + + let mut exceeds_max_merged_line_limit = false; + + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { + bytes_mut.extend_from_slice(event_bytes); + if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes { + exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes; + + if exceeds_max_merged_line_limit { + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBigError { + event: &Value::Bytes(event_bytes.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); + } + } + } + self.buckets.insert( file.to_owned(), Bucket { event, expiration: Instant::now() + expiration_time, + exceeds_max_merged_line_limit, }, ); } } fn remove_event(&mut self, file: &str) -> Option { - self.buckets.remove(file).map(|bucket| bucket.event) + self.buckets + .remove(file) + .filter(|bucket| !bucket.exceeds_max_merged_line_limit) + .map(|bucket| bucket.event) } fn emit_expired_events(&mut self, emitter: &mut Emitter) { let now = Instant::now(); self.buckets.retain(|_key, bucket| { let expired = now >= bucket.expiration; - if expired { + if expired && !bucket.exceeds_max_merged_line_limit { emitter.emit(bucket.event.clone()); } !expired @@ -70,7 +114,9 @@ impl PartialEventMergeState { fn flush_events(&mut self, emitter: &mut Emitter) { for (_, bucket) in self.buckets.drain() { - emitter.emit(bucket.event); + if !bucket.exceeds_max_merged_line_limit { + emitter.emit(bucket.event); + } } } } @@ -78,13 +124,20 @@ impl PartialEventMergeState { struct Bucket { event: LogEvent, expiration: Instant, + exceeds_max_merged_line_limit: bool, } pub fn merge_partial_events( stream: impl Stream + 'static, log_namespace: LogNamespace, + maybe_max_merged_line_bytes: Option, ) -> impl Stream { - merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME) + merge_partial_events_with_custom_expiration( + stream, + log_namespace, + EXPIRATION_TIME, + maybe_max_merged_line_bytes, + ) } // internal function that allows customizing the expiration time (for testing) @@ -92,6 +145,7 @@ fn merge_partial_events_with_custom_expiration( stream: impl Stream + 'static, log_namespace: LogNamespace, expiration_time: Duration, + maybe_max_merged_line_bytes: Option, ) -> impl Stream { let partial_flag_path = match log_namespace { LogNamespace::Vector => { @@ -109,6 +163,7 @@ fn merge_partial_events_with_custom_expiration( let state = PartialEventMergeState { buckets: HashMap::new(), + maybe_max_merged_line_bytes, }; let message_path = get_message_path(log_namespace); @@ -164,7 +219,7 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -174,6 +229,18 @@ mod test { ); } + #[tokio::test] + async fn merge_single_event_legacy_exceeds_max_merged_line_limit() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + + let input_stream = futures::stream::iter([e_1.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1)); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn merge_multiple_events_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -184,7 +251,7 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -194,6 +261,23 @@ mod test { ); } + #[tokio::test] + async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 24 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24)); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_flush_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -205,7 +289,7 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -215,6 +299,24 @@ mod test { ); } + #[tokio::test] + async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + e_1.insert("_partial", true); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 24 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24)); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_expire_legacy() { let mut e_1 = LogEvent::from("test message"); @@ -233,6 +335,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), + None, ); let output: Vec = output_stream.take(2).collect().await; @@ -256,7 +359,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -286,7 +389,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); diff --git a/website/cue/reference/components/sources/base/kubernetes_logs.cue b/website/cue/reference/components/sources/base/kubernetes_logs.cue index 5895f56908506..f07e3b317e5a0 100644 --- a/website/cue/reference/components/sources/base/kubernetes_logs.cue +++ b/website/cue/reference/components/sources/base/kubernetes_logs.cue @@ -191,6 +191,19 @@ base: components: sources: kubernetes_logs: configuration: { unit: "bytes" } } + max_merged_line_bytes: { + description: """ + The maximum number of bytes a line can contain - after merging - before being discarded. + + This protects against malformed lines or tailing incorrect files. + + Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this + config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply + if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + """ + required: false + type: uint: unit: "bytes" + } max_read_bytes: { description: """ Max amount of bytes to read from a single file before switching over to the next file.