Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions changelog.d/22581_max_merged_line_bytes.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `kubernetes_logs` source now includes a new `max_merged_line_bytes` configuration option. This setting enables users to cap the size of log lines after they’ve been combined using `auto_partial_merge`. Previously, the `max_line_bytes` field only restricted line sizes *before* merging, leaving no practical way to limit the length of merged lines—unless you set a size so tiny that it prevented merging altogether by stopping short of the continuation character. This new option gives you better control over merged line sizes.

authors: ganelo
55 changes: 39 additions & 16 deletions lib/file-source/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::io::{self, BufRead};
use std::{
cmp::min,
io::{self, BufRead},
};

use bstr::Finder;
use bytes::BytesMut;
use tracing::warn;

use crate::FilePosition;

pub struct ReadResult {
pub successfully_read: Option<usize>,
pub discarded_for_size_and_truncated: Vec<BytesMut>,
}

/// Read up to `max_size` bytes from `reader`, splitting by `delim`
///
/// The function reads up to `max_size` bytes from `reader`, splitting the input
Expand All @@ -29,17 +36,18 @@ use crate::FilePosition;
/// Benchmarks indicate that this function processes in the high single-digit
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
/// the overhead of setup dominates our benchmarks.
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
reader: &mut R,
position: &mut FilePosition,
delim: &[u8],
buf: &mut BytesMut,
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
reader: &'a mut R,
position: &'a mut FilePosition,
delim: &'a [u8],
buf: &'a mut BytesMut,
max_size: usize,
) -> io::Result<Option<usize>> {
) -> io::Result<ReadResult> {
let mut total_read = 0;
let mut discarding = false;
let delim_finder = Finder::new(delim);
let delim_len = delim.len();
let mut discarded_for_size_and_truncated = Vec::new();
loop {
let available: &[u8] = match reader.fill_buf() {
Ok(n) => n,
Expand Down Expand Up @@ -68,16 +76,20 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
total_read += used;

if !discarding && buf.len() > max_size {
warn!(
message = "Found line that exceeds max_line_bytes; discarding.",
internal_log_rate_limit = true
);
// keep only the first <1k bytes to make sure we can actually emit a usable error
let length_to_keep = min(1000, max_size);
let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
truncated.copy_from_slice(&buf[0..length_to_keep]);
discarded_for_size_and_truncated.push(truncated);
discarding = true;
}

if done {
if !discarding {
return Ok(Some(total_read));
return Ok(ReadResult {
successfully_read: Some(total_read),
discarded_for_size_and_truncated,
});
} else {
discarding = false;
buf.clear();
Expand All @@ -87,7 +99,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
// us to observe an incomplete write. We return None here and let the loop continue
// next time the method is called. This is safe because the buffer is specific to this
// FileWatcher.
return Ok(None);
return Ok(ReadResult {
successfully_read: None,
discarded_for_size_and_truncated,
});
}
}
}
Expand All @@ -99,6 +114,8 @@ mod test {
use bytes::{BufMut, BytesMut};
use quickcheck::{QuickCheck, TestResult};

use crate::buffer::ReadResult;

use super::read_until_with_max_size;

fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
Expand Down Expand Up @@ -181,7 +198,10 @@ mod test {
)
.unwrap()
{
None => {
ReadResult {
successfully_read: None,
discarded_for_size_and_truncated: _,
} => {
// Subject only returns None if this is the last chunk _and_
// the chunk did not contain a delimiter _or_ the delimiter
// was outside the max_size range _or_ the current chunk is empty.
Expand All @@ -190,7 +210,10 @@ mod test {
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
assert!(chunk.is_empty() || !has_valid_delimiter)
}
Some(total_read) => {
ReadResult {
successfully_read: Some(total_read),
discarded_for_size_and_truncated: _,
} => {
// Now that the function has returned we confirm that the
// returned details match our `first_delim` and also that
// the `buffer` is populated correctly.
Expand Down
16 changes: 14 additions & 2 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};

use crate::{
checkpointer::{Checkpointer, CheckpointsView},
file_watcher::FileWatcher,
file_watcher::{FileWatcher, RawLineResult},
fingerprinter::{FileFingerprint, Fingerprinter},
paths_provider::PathsProvider,
FileSourceInternalEvents, ReadFrom,
Expand Down Expand Up @@ -263,7 +263,19 @@ where

let start = time::Instant::now();
let mut bytes_read: usize = 0;
while let Ok(Some(line)) = watcher.read_line() {
while let Ok(RawLineResult {
raw_line: Some(line),
discarded_for_size_and_truncated,
}) = watcher.read_line()
{
discarded_for_size_and_truncated.iter().for_each(|buf| {
self.emitter.emit_file_line_too_long(
&buf.clone(),
self.max_line_bytes,
buf.len(),
)
});

let sz = line.bytes.len();
trace!(
message = "Read bytes.",
Expand Down
54 changes: 40 additions & 14 deletions lib/file-source/src/file_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tracing::debug;
use vector_common::constants::GZIP_MAGIC;

use crate::{
buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom,
buffer::{read_until_with_max_size, ReadResult},
metadata_ext::PortableFileExt,
FilePosition, ReadFrom,
};
#[cfg(test)]
mod tests;
Expand All @@ -28,6 +30,12 @@ pub(super) struct RawLine {
pub bytes: Bytes,
}

#[derive(Debug)]
pub struct RawLineResult {
pub raw_line: Option<RawLine>,
pub discarded_for_size_and_truncated: Vec<BytesMut>,
}

/// The `FileWatcher` struct defines the polling based state machine which reads
/// from a file path, transparently updating the underlying file descriptor when
/// the file has been rolled over, as is common for logs.
Expand Down Expand Up @@ -207,7 +215,7 @@ impl FileWatcher {
/// This function will attempt to read a new line from its file, blocking,
/// up to some maximum but unspecified amount of time. `read_line` will open
/// a new file handler as needed, transparently to the caller.
pub(super) fn read_line(&mut self) -> io::Result<Option<RawLine>> {
pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
self.track_read_attempt();

let reader = &mut self.reader;
Expand All @@ -220,14 +228,23 @@ impl FileWatcher {
&mut self.buf,
self.max_line_bytes,
) {
Ok(Some(_)) => {
Ok(ReadResult {
successfully_read: Some(_),
discarded_for_size_and_truncated,
}) => {
self.track_read_success();
Ok(Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}),
discarded_for_size_and_truncated,
})
}
Ok(None) => {
Ok(ReadResult {
successfully_read: None,
discarded_for_size_and_truncated,
}) => {
if !self.file_findable() {
self.set_dead();
// File has been deleted, so return what we have in the buffer, even though it
Expand All @@ -237,16 +254,25 @@ impl FileWatcher {
if buf.is_empty() {
// EOF
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size_and_truncated,
})
} else {
Ok(Some(RawLine {
offset: initial_position,
bytes: buf,
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: buf,
}),
discarded_for_size_and_truncated,
})
}
} else {
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size_and_truncated,
})
}
}
Err(e) => {
Expand Down
9 changes: 6 additions & 3 deletions lib/file-source/src/file_watcher/tests/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -96,11 +96,14 @@ fn experiment(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(Some(line)) => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) => {
let exp = fwfiles[read_index].read_line().expect("could not readline");
assert_eq!(exp.into_bytes(), line.bytes);
// assert_eq!(sz, buf.len() + 1);
Expand Down
3 changes: 3 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ mod test {
time::Duration,
};

use bytes::BytesMut;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

Expand Down Expand Up @@ -815,5 +816,7 @@ mod test {
fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_gave_up_on_deleted_file(&self, _: &Path) {}

fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
}
}
9 changes: 9 additions & 0 deletions lib/file-source/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;

/// Every internal event in this crate has a corresponding
/// method in this trait which should emit the event.
pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
Expand Down Expand Up @@ -28,4 +30,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_gave_up_on_deleted_file(&self, path: &Path);

fn emit_file_line_too_long(
&self,
truncated_bytes: &BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
);
}
Loading