Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/codecs/src/decoding/framing/chunked_gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ impl ChunkedGelfDecoder {
warn!(
message_id = message_id,
timeout_secs = timeout.as_secs_f64(),
internal_log_rate_limit = true,
"Message was not fully received within the timeout window. Discarding it."
);
}
Expand All @@ -408,6 +409,7 @@ impl ChunkedGelfDecoder {
debug!(
message_id = message_id,
sequence_number = sequence_number,
internal_log_rate_limit = true,
"Received a duplicate chunk. Ignoring it."
);
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion lib/dnstap-parser/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl<E: std::fmt::Display> InternalEvent for DnstapParseWarning<E> {
error = %self.error,
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,

internal_log_rate_limit = true,
);
}
}
1 change: 1 addition & 0 deletions lib/tracing-limit/benches/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn bench(c: &mut Criterion) {
bar = "bar",
baz = 3,
quuux = ?0.99,
internal_log_rate_limit = true
)
}
})
Expand Down
6 changes: 5 additions & 1 deletion lib/tracing-limit/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ fn main() {
tracing::dispatcher::with_default(&dispatch, || {
for i in 0..40usize {
trace!("This field is not rate limited!");
info!(message = "This message is rate limited", count = &i,);
info!(
message = "This message is rate limited",
count = &i,
internal_log_rate_limit = true,
);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
})
Expand Down
1 change: 1 addition & 0 deletions lib/tracing-limit/examples/by_span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn main() {
message =
"This message is rate limited by its component and vrl_line_number",
count = &i,
internal_log_rate_limit = true,
);
}
}
Expand Down
45 changes: 13 additions & 32 deletions lib/tracing-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
let mut limit_visitor = LimitVisitor::default();
event.record(&mut limit_visitor);

let limit_exists = limit_visitor.limit.unwrap_or(true);
let limit_exists = limit_visitor.limit.unwrap_or(false);
if !limit_exists {
return self.inner.on_event(event, ctx);
}
Expand Down Expand Up @@ -264,8 +264,11 @@ where
let valueset = fields.value_set(&values);
let event = Event::new(metadata, &valueset);
self.inner.on_event(&event, ctx.clone());
} else if let Some(ratelimit_field) = fields.field(RATE_LIMIT_FIELD) {
let values = [(&ratelimit_field, Some(&rate_limit as &dyn Value))];
} else {
let values = [(
&fields.field(RATE_LIMIT_FIELD).unwrap(),
Some(&rate_limit as &dyn Value),
)];

let valueset = fields.value_set(&values);
let event = Event::new(metadata, &valueset);
Expand Down Expand Up @@ -522,34 +525,6 @@ mod test {
);
}

#[test]
fn rate_limits_default() {
let events: Arc<Mutex<Vec<String>>> = Default::default();

let recorder = RecordingLayer::new(Arc::clone(&events));
let sub = tracing_subscriber::registry::Registry::default()
.with(RateLimitedLayer::new(recorder).with_default_limit(10));
tracing::subscriber::with_default(sub, || {
for _ in 0..21 {
info!(message = "Hello world!");
MockClock::advance(Duration::from_millis(100));
}
});

let events = events.lock().unwrap();

assert_eq!(
*events,
vec![
"Hello world!",
"Internal log [Hello world!] is being suppressed to avoid flooding.",
]
.into_iter()
.map(std::borrow::ToOwned::to_owned)
.collect::<Vec<String>>()
);
}

#[test]
fn override_rate_limit_at_callsite() {
let events: Arc<Mutex<Vec<String>>> = Default::default();
Expand All @@ -559,7 +534,11 @@ mod test {
.with(RateLimitedLayer::new(recorder).with_default_limit(100));
tracing::subscriber::with_default(sub, || {
for _ in 0..21 {
info!(message = "Hello world!", internal_log_rate_secs = 1);
info!(
message = "Hello world!",
internal_log_rate_limit = true,
internal_log_rate_secs = 1
);
MockClock::advance(Duration::from_millis(100));
}
});
Expand Down Expand Up @@ -599,6 +578,7 @@ mod test {
let _enter = span.enter();
info!(
message = format!("Hello {key} on line_number {line_number}!").as_str(),
internal_log_rate_limit = true
);
}
}
Expand Down Expand Up @@ -659,6 +639,7 @@ mod test {
for line_number in &[1, 2] {
info!(
message = format!("Hello {key} on line_number {line_number}!").as_str(),
internal_log_rate_limit = true,
component_id = &key,
vrl_position = &line_number
);
Expand Down
1 change: 1 addition & 0 deletions lib/vector-buffers/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl InternalEvent for BufferReadError {
error_code = self.error_code,
error_type = error_type::READER_FAILED,
stage = "processing",
internal_log_rate_limit = true,
);
counter!(
"buffer_errors_total", "error_code" => self.error_code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ impl<const INTENDED: bool> InternalEventHandle for DroppedHandle<'_, INTENDED> {
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
} else {
error!(
message,
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
}
self.discarded_events.increment(data.0 as u64);
Expand Down
3 changes: 2 additions & 1 deletion lib/vector-common/src/internal_event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ impl<E: std::fmt::Debug> InternalEvent for PollReadyError<E> {
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -44,7 +45,7 @@ impl<E: std::fmt::Debug> InternalEvent for CallError<E> {
request_id = self.request_id,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
5 changes: 4 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,10 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
};

trace::init(color, json, &level, rate);
debug!(message = "Internal log rate limit configured.",);
debug!(
message = "Internal log rate limit configured.",
internal_log_rate_secs = rate,
);
info!(message = "Log level is enabled.", level = ?level);
}

Expand Down
1 change: 0 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ pub struct RootOpts {
pub watch_config_poll_interval_seconds: NonZeroU64,

/// Set the internal log rate limit
/// Note that traces are throttled by default unless tagged with `internal_log_rate_limit = false`.
#[arg(
short,
long,
Expand Down
4 changes: 2 additions & 2 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ impl HttpResourceOutputContext<'_> {
// entire payload which may contain multiple frames and their delimiters.
Ok(Some((events, decoded_byte_size))) => {
if should_reject {
info!("HTTP server external output resource decoded {decoded_byte_size} bytes but test case configured to reject.");
info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.", );
} else {
let mut output_runner_metrics =
output_runner_metrics.lock().await;
info!("HTTP server external output resource decoded {decoded_byte_size} bytes.");
info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes.");

// Update the runner metrics for the received events. This will later
// be used in the Validators, as the "expected" case.
Expand Down
12 changes: 8 additions & 4 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,30 @@ pub fn spawn_thread<'a>(
info!("Configuration file changed.");
if !changed_components.is_empty() {
info!(
internal_log_rate_limit = true,
"Component {:?} configuration changed.",
changed_components.keys()
);
if changed_components
.iter()
.all(|(_, t)| *t == ComponentType::EnrichmentTable)
{
info!("Only enrichment tables have changed.",);
info!(
internal_log_rate_limit = true,
"Only enrichment tables have changed."
);
_ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| {
error!(message = "Unable to reload enrichment tables.", cause = %error)
error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true)
});
} else {
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
});
}
} else {
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
.map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
});
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/internal_events/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand All @@ -58,7 +58,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::RECEIVING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand All @@ -80,7 +80,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::COMMAND_FAILED,
stage = error_stage::RECEIVING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/apache_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl InternalEvent for ApacheMetricsParseError<'_> {
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,
endpoint = %self.endpoint,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
1 change: 1 addition & 0 deletions src/internal_events/aws_cloudwatch_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl InternalEvent for AwsCloudwatchLogsMessageSizeError {
error_code = "message_too_long",
error_type = error_type::ENCODER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl InternalEvent for AwsEc2MetadataRefreshError {
error = %self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
4 changes: 2 additions & 2 deletions src/internal_events/aws_ecs_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ impl InternalEvent for AwsEcsMetricsParseError<'_> {
error = ?self.error,
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,

internal_log_rate_limit = true,
);
debug!(
message = %format!("Failed to parse response:\\n\\n{}\\n\\n", self.body.escape_debug()),
endpoint = %self.endpoint,

internal_log_rate_limit = true,
);
counter!("parse_errors_total").increment(1);
counter!(
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/aws_kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl InternalEvent for AwsKinesisStreamNoPartitionKeyError<'_> {
partition_key_field = %self.partition_key_field,
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);

counter!(
Expand Down
3 changes: 2 additions & 1 deletion src/internal_events/aws_kinesis_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
error_type = error_type::REQUEST_FAILED,
error_code = %self.error_code,
request_id = %self.request_id.unwrap_or(""),
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand All @@ -74,7 +75,7 @@ impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
error_type = error_type::PARSER_FAILED,
error_code = %io_error_code(&self.error),
compression = %self.compression,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
10 changes: 5 additions & 5 deletions src/internal_events/aws_sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mod s3 {
error_code = "failed_processing_sqs_message",
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -74,7 +74,7 @@ mod s3 {
error_code = "failed_deleting_some_sqs_messages",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -104,7 +104,7 @@ mod s3 {
error_code = "failed_deleting_all_sqs_messages",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<E: std::fmt::Display> InternalEvent for SqsMessageReceiveError<'_, E> {
error_code = "failed_fetching_sqs_events",
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -257,7 +257,7 @@ impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteError<'_, E> {
error = %self.error,
error_type = error_type::WRITER_FAILED,
stage = error_stage::PROCESSING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl InternalEvent for LargeEventDroppedError {
length = %self.length,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::SENDING,

internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
Loading
Loading