Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1013e9f
Add config for maximum allowed line size after merging
oganel Feb 27, 2025
2b3ea0a
Add warns when we drop partial logs for being too big; shift some com…
oganel Feb 27, 2025
cd9d607
Add changelog
oganel Mar 3, 2025
dd5fad0
Format
oganel Mar 3, 2025
354008c
Increment component_discarded_events_total on violation of max_line_s…
oganel Mar 7, 2025
0d1b806
Update changelog.d/22581_max_merged_line_bytes.feature.md
ganelo Mar 13, 2025
8e0c994
Don't emit expired events that are too big nor ones that don't appear…
oganel Mar 27, 2025
cae7b01
Fix another test
oganel Mar 28, 2025
bc23d35
Update src/sources/kubernetes_logs/mod.rs
ganelo Mar 31, 2025
62bce6c
Update src/sources/kubernetes_logs/mod.rs
ganelo Mar 31, 2025
4e93bc9
Remove inadvertently added file
oganel Mar 31, 2025
f9d7ca8
Include Value rather than Event in error struct
oganel Mar 31, 2025
6e58258
Rename field in bucket struct
oganel Mar 31, 2025
e551a4c
Move max_merged_line_bytes from being a param to being a field on the…
oganel Mar 31, 2025
7bac39a
Make new config field optional, defaulting to old behavior
oganel Apr 3, 2025
4975ebe
Format
oganel Apr 3, 2025
af11a17
Merge remote-tracking branch 'origin/master' into og/max_merged_line_…
pront Apr 8, 2025
7ca79d0
Appease check-events
oganel Apr 10, 2025
d82e92c
Merge branch 'og/max_merged_line_bytes' of https://github.com/ganelo/…
oganel Apr 10, 2025
b51c3df
Merge remote-tracking branch 'origin/master' into og/max_merged_line_…
pront Apr 28, 2025
c9919b4
docs regen
pront Apr 28, 2025
6062927
Tweak wording of doc; emit only first 1k bytes of dropped lines in error
oganel May 14, 2025
3b8edf6
Rename fields for clarity
oganel May 14, 2025
08b7292
Per PR feedback: copy just the initial 1000 bytes rather than cloning…
oganel Jun 11, 2025
bfe4a25
Merge branch 'master' into og/max_merged_line_bytes
oganel Jun 11, 2025
335f6e2
Allow spelling of already-merged changelog filename
oganel Jun 11, 2025
e4efae0
Don't try to include more characters than there actually are in the s…
oganel Jun 11, 2025
36b4c77
Don't just get enough capacity, make sure length matches too
oganel Jun 11, 2025
ffcfcc3
Formatting
oganel Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@
efgh
Elhage
emerg
Enableable

Check warning on line 177 in .github/actions/spelling/expect.txt

View workflow job for this annotation

GitHub Actions / Check Spelling

`Enableable` is ignored by check-spelling because another more general variant is also in expect. (ignored-expect-variant)
enableable
endianess
endler
eni
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/22581_max_merged_line_bytes.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes.

authors: ganelo
55 changes: 39 additions & 16 deletions lib/file-source/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::io::{self, BufRead};
use std::{
cmp::min,
io::{self, BufRead},
};

use bstr::Finder;
use bytes::BytesMut;
use tracing::warn;

use crate::FilePosition;

pub struct ReadResult {
pub successfully_read: Option<usize>,
pub discarded_for_size_and_truncated: Vec<BytesMut>,
}

/// Read up to `max_size` bytes from `reader`, splitting by `delim`
///
/// The function reads up to `max_size` bytes from `reader`, splitting the input
Expand All @@ -29,17 +36,18 @@ use crate::FilePosition;
/// Benchmarks indicate that this function processes in the high single-digit
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
/// the overhead of setup dominates our benchmarks.
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
reader: &mut R,
position: &mut FilePosition,
delim: &[u8],
buf: &mut BytesMut,
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
reader: &'a mut R,
position: &'a mut FilePosition,
delim: &'a [u8],
buf: &'a mut BytesMut,
max_size: usize,
) -> io::Result<Option<usize>> {
) -> io::Result<ReadResult> {
let mut total_read = 0;
let mut discarding = false;
let delim_finder = Finder::new(delim);
let delim_len = delim.len();
let mut discarded_for_size_and_truncated = Vec::new();
loop {
let available: &[u8] = match reader.fill_buf() {
Ok(n) => n,
Expand Down Expand Up @@ -68,16 +76,20 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
total_read += used;

if !discarding && buf.len() > max_size {
warn!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with this code. I wonder why this warn! was moved only to be emitted later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was part of plumbing through the actual contents that were being dropped so they could be included in the error.

message = "Found line that exceeds max_line_bytes; discarding.",
internal_log_rate_limit = true
);
// keep only the first <1k bytes to make sure we can actually emit a usable error
let length_to_keep = min(1000, max_size);
let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
truncated.copy_from_slice(&buf[0..length_to_keep]);
discarded_for_size_and_truncated.push(truncated);
discarding = true;
}

if done {
if !discarding {
return Ok(Some(total_read));
return Ok(ReadResult {
successfully_read: Some(total_read),
discarded_for_size_and_truncated,
});
} else {
discarding = false;
buf.clear();
Expand All @@ -87,7 +99,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
// us to observe an incomplete write. We return None here and let the loop continue
// next time the method is called. This is safe because the buffer is specific to this
// FileWatcher.
return Ok(None);
return Ok(ReadResult {
successfully_read: None,
discarded_for_size_and_truncated,
});
}
}
}
Expand All @@ -99,6 +114,8 @@ mod test {
use bytes::{BufMut, BytesMut};
use quickcheck::{QuickCheck, TestResult};

use crate::buffer::ReadResult;

use super::read_until_with_max_size;

fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
Expand Down Expand Up @@ -181,7 +198,10 @@ mod test {
)
.unwrap()
{
None => {
ReadResult {
successfully_read: None,
discarded_for_size_and_truncated: _,
} => {
// Subject only returns None if this is the last chunk _and_
// the chunk did not contain a delimiter _or_ the delimiter
// was outside the max_size range _or_ the current chunk is empty.
Expand All @@ -190,7 +210,10 @@ mod test {
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
assert!(chunk.is_empty() || !has_valid_delimiter)
}
Some(total_read) => {
ReadResult {
successfully_read: Some(total_read),
discarded_for_size_and_truncated: _,
} => {
// Now that the function has returned we confirm that the
// returned details match our `first_delim` and also that
// the `buffer` is populated correctly.
Expand Down
16 changes: 14 additions & 2 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};

use crate::{
checkpointer::{Checkpointer, CheckpointsView},
file_watcher::FileWatcher,
file_watcher::{FileWatcher, RawLineResult},
fingerprinter::{FileFingerprint, Fingerprinter},
paths_provider::PathsProvider,
FileSourceInternalEvents, ReadFrom,
Expand Down Expand Up @@ -263,7 +263,19 @@ where

let start = time::Instant::now();
let mut bytes_read: usize = 0;
while let Ok(Some(line)) = watcher.read_line() {
while let Ok(RawLineResult {
raw_line: Some(line),
discarded_for_size_and_truncated,
}) = watcher.read_line()
{
discarded_for_size_and_truncated.iter().for_each(|buf| {
self.emitter.emit_file_line_too_long(
&buf.clone(),
self.max_line_bytes,
buf.len(),
)
});

let sz = line.bytes.len();
trace!(
message = "Read bytes.",
Expand Down
54 changes: 40 additions & 14 deletions lib/file-source/src/file_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tracing::debug;
use vector_common::constants::GZIP_MAGIC;

use crate::{
buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom,
buffer::{read_until_with_max_size, ReadResult},
metadata_ext::PortableFileExt,
FilePosition, ReadFrom,
};
#[cfg(test)]
mod tests;
Expand All @@ -28,6 +30,12 @@ pub(super) struct RawLine {
pub bytes: Bytes,
}

#[derive(Debug)]
pub struct RawLineResult {
pub raw_line: Option<RawLine>,
pub discarded_for_size_and_truncated: Vec<BytesMut>,
}

/// The `FileWatcher` struct defines the polling based state machine which reads
/// from a file path, transparently updating the underlying file descriptor when
/// the file has been rolled over, as is common for logs.
Expand Down Expand Up @@ -207,7 +215,7 @@ impl FileWatcher {
/// This function will attempt to read a new line from its file, blocking,
/// up to some maximum but unspecified amount of time. `read_line` will open
/// a new file handler as needed, transparently to the caller.
pub(super) fn read_line(&mut self) -> io::Result<Option<RawLine>> {
pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
self.track_read_attempt();

let reader = &mut self.reader;
Expand All @@ -220,14 +228,23 @@ impl FileWatcher {
&mut self.buf,
self.max_line_bytes,
) {
Ok(Some(_)) => {
Ok(ReadResult {
successfully_read: Some(_),
discarded_for_size_and_truncated,
}) => {
self.track_read_success();
Ok(Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}),
discarded_for_size_and_truncated,
})
}
Ok(None) => {
Ok(ReadResult {
successfully_read: None,
discarded_for_size_and_truncated,
}) => {
if !self.file_findable() {
self.set_dead();
// File has been deleted, so return what we have in the buffer, even though it
Expand All @@ -237,16 +254,25 @@ impl FileWatcher {
if buf.is_empty() {
// EOF
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size_and_truncated,
})
} else {
Ok(Some(RawLine {
offset: initial_position,
bytes: buf,
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: buf,
}),
discarded_for_size_and_truncated,
})
}
} else {
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size_and_truncated,
})
}
}
Err(e) => {
Expand Down
9 changes: 6 additions & 3 deletions lib/file-source/src/file_watcher/tests/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -96,11 +96,14 @@ fn experiment(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(Some(line)) => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) => {
let exp = fwfiles[read_index].read_line().expect("could not readline");
assert_eq!(exp.into_bytes(), line.bytes);
// assert_eq!(sz, buf.len() + 1);
Expand Down
3 changes: 3 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ mod test {
time::Duration,
};

use bytes::BytesMut;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

Expand Down Expand Up @@ -813,5 +814,7 @@ mod test {
fn emit_files_open(&self, _: usize) {}

fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
}
}
9 changes: 9 additions & 0 deletions lib/file-source/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;

/// Every internal event in this crate has a corresponding
/// method in this trait which should emit the event.
pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
Expand All @@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_files_open(&self, count: usize);

fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_file_line_too_long(
&self,
truncated_bytes: &BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
);
}
Loading
Loading