From cf28141eea2b3f315064112d5ba9b56cb3cd12de Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 8 Aug 2025 15:51:19 -0400 Subject: [PATCH 1/6] Revert "feat(dev): Enable `internal_log_rate_limit` by default (#22899)" This reverts commit dffcb5a02c881dd94a84470722ca4e6040a9cc43. --- .../src/decoding/framing/chunked_gelf.rs | 2 + lib/dnstap-parser/src/internal_events.rs | 2 +- lib/tracing-limit/benches/limit.rs | 1 + lib/tracing-limit/examples/basic.rs | 6 ++- lib/tracing-limit/examples/by_span.rs | 1 + lib/tracing-limit/src/lib.rs | 43 +++++-------------- lib/vector-buffers/src/internal_events.rs | 1 + .../component_events_dropped.rs | 2 + .../src/internal_event/service.rs | 3 +- src/app.rs | 5 ++- src/cli.rs | 1 - src/internal_events/amqp.rs | 6 +-- src/internal_events/apache_metrics.rs | 2 +- src/internal_events/aws_cloudwatch_logs.rs | 1 + src/internal_events/aws_ec2_metadata.rs | 2 +- src/internal_events/aws_ecs_metrics.rs | 4 +- src/internal_events/aws_kinesis.rs | 2 +- src/internal_events/aws_kinesis_firehose.rs | 3 +- src/internal_events/aws_sqs.rs | 10 ++--- src/internal_events/batch.rs | 2 +- src/internal_events/codecs.rs | 10 ++--- src/internal_events/common.rs | 5 ++- src/internal_events/conditions.rs | 1 + src/internal_events/datadog_agent.rs | 2 +- src/internal_events/datadog_metrics.rs | 1 + src/internal_events/datadog_traces.rs | 4 +- src/internal_events/dnstap.rs | 2 +- src/internal_events/docker_logs.rs | 2 +- src/internal_events/encoding_transcode.rs | 1 + src/internal_events/eventstoredb_metrics.rs | 4 +- src/internal_events/exec.rs | 9 ++-- src/internal_events/file.rs | 9 ++-- src/internal_events/fluent.rs | 2 +- src/internal_events/gcp_pubsub.rs | 6 +-- src/internal_events/host_metrics.rs | 5 ++- src/internal_events/http.rs | 2 +- src/internal_events/http_client.rs | 2 +- src/internal_events/http_client_source.rs | 4 +- src/internal_events/influxdb.rs | 1 + src/internal_events/journald.rs | 9 ++-- src/internal_events/kafka.rs | 5 ++- src/internal_events/kubernetes_logs.rs | 10 ++--- src/internal_events/logplex.rs | 2 +- src/internal_events/loki.rs | 4 ++ src/internal_events/lua.rs | 3 +- src/internal_events/metric_to_log.rs | 1 + src/internal_events/mongodb_metrics.rs | 4 +- src/internal_events/mqtt.rs | 2 +- src/internal_events/nginx_metrics.rs | 4 +- src/internal_events/parser.rs | 1 + src/internal_events/postgresql_metrics.rs | 2 +- src/internal_events/process.rs | 3 ++ src/internal_events/prometheus.rs | 5 ++- src/internal_events/pulsar.rs | 10 ++--- src/internal_events/redis.rs | 2 +- src/internal_events/reduce.rs | 2 +- src/internal_events/remap.rs | 2 +- src/internal_events/sematext_metrics.rs | 4 +- src/internal_events/socket.rs | 6 +-- src/internal_events/splunk_hec.rs | 10 ++--- src/internal_events/statsd_sink.rs | 2 +- src/internal_events/tag_cardinality_limit.rs | 2 + src/internal_events/tcp.rs | 4 ++ src/internal_events/template.rs | 4 +- src/internal_events/udp.rs | 1 + src/internal_events/unix.rs | 7 +-- src/internal_events/websocket.rs | 4 +- src/internal_events/websocket_server.rs | 4 +- src/internal_events/windows.rs | 1 + src/sinks/datadog/logs/sink.rs | 1 + src/sinks/datadog/traces/apm_stats/flusher.rs | 1 + src/sinks/gcp/stackdriver/logs/encoder.rs | 1 + .../util/adaptive_concurrency/controller.rs | 1 + src/sinks/util/retries.rs | 4 +- src/sources/datadog_agent/metrics.rs | 5 ++- src/sources/heroku_logs.rs | 1 + src/sources/socket/udp.rs | 2 + src/transforms/lua/v1/mod.rs | 1 + 78 files changed, 174 insertions(+), 134 deletions(-) diff --git a/lib/codecs/src/decoding/framing/chunked_gelf.rs b/lib/codecs/src/decoding/framing/chunked_gelf.rs index 51ffd53fb6f1b..9c34e002cce43 100644 --- a/lib/codecs/src/decoding/framing/chunked_gelf.rs +++ b/lib/codecs/src/decoding/framing/chunked_gelf.rs @@ -387,6 +387,7 @@ impl ChunkedGelfDecoder { warn!( message_id = message_id, timeout_secs = timeout.as_secs_f64(), + internal_log_rate_limit = true, "Message was not fully received within the timeout window. Discarding it." ); } @@ -408,6 +409,7 @@ impl ChunkedGelfDecoder { debug!( message_id = message_id, sequence_number = sequence_number, + internal_log_rate_limit = true, "Received a duplicate chunk. Ignoring it." ); return Ok(None); diff --git a/lib/dnstap-parser/src/internal_events.rs b/lib/dnstap-parser/src/internal_events.rs index 2ce75acb19709..ab1635c9b3e03 100644 --- a/lib/dnstap-parser/src/internal_events.rs +++ b/lib/dnstap-parser/src/internal_events.rs @@ -14,7 +14,7 @@ impl InternalEvent for DnstapParseWarning { error = %self.error, stage = error_stage::PROCESSING, error_type = error_type::PARSER_FAILED, - + internal_log_rate_limit = true, ); } } diff --git a/lib/tracing-limit/benches/limit.rs b/lib/tracing-limit/benches/limit.rs index 3a43707d7f992..79ed62b6de3d4 100644 --- a/lib/tracing-limit/benches/limit.rs +++ b/lib/tracing-limit/benches/limit.rs @@ -58,6 +58,7 @@ fn bench(c: &mut Criterion) { bar = "bar", baz = 3, quuux = ?0.99, + internal_log_rate_limit = true ) } }) diff --git a/lib/tracing-limit/examples/basic.rs b/lib/tracing-limit/examples/basic.rs index 86c973739b13a..3b4428d540997 100644 --- a/lib/tracing-limit/examples/basic.rs +++ b/lib/tracing-limit/examples/basic.rs @@ -14,7 +14,11 @@ fn main() { tracing::dispatcher::with_default(&dispatch, || { for i in 0..40usize { trace!("This field is not rate limited!"); - info!(message = "This message is rate limited", count = &i,); + info!( + message = "This message is rate limited", + count = &i, + internal_log_rate_limit = true, + ); std::thread::sleep(std::time::Duration::from_millis(1000)); } }) diff --git a/lib/tracing-limit/examples/by_span.rs b/lib/tracing-limit/examples/by_span.rs index 0c7e81666d279..63b8775d9905d 100644 --- a/lib/tracing-limit/examples/by_span.rs +++ b/lib/tracing-limit/examples/by_span.rs @@ -28,6 +28,7 @@ fn main() { message = "This message is rate limited by its component and vrl_line_number", count = &i, + internal_log_rate_limit = true, ); } } diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 3f29cd7fa6c1d..44fb9848de417 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -129,7 +129,7 @@ where let mut limit_visitor = LimitVisitor::default(); event.record(&mut limit_visitor); - let limit_exists = limit_visitor.limit.unwrap_or(true); + let limit_exists = limit_visitor.limit.unwrap_or(false); if !limit_exists { return self.inner.on_event(event, ctx); } @@ -264,8 +264,11 @@ where let valueset = fields.value_set(&values); let event = Event::new(metadata, &valueset); self.inner.on_event(&event, ctx.clone()); - } else if let Some(ratelimit_field) = fields.field(RATE_LIMIT_FIELD) { - let values = [(&ratelimit_field, Some(&rate_limit as &dyn Value))]; + } else { + let values = [( + &fields.field(RATE_LIMIT_FIELD).unwrap(), + Some(&rate_limit as &dyn Value), + )]; let valueset = fields.value_set(&values); let event = Event::new(metadata, &valueset); @@ -522,34 +525,6 @@ mod test { ); } - #[test] - fn rate_limits_default() { - let events: Arc>> = Default::default(); - - let recorder = RecordingLayer::new(Arc::clone(&events)); - let sub = tracing_subscriber::registry::Registry::default() - .with(RateLimitedLayer::new(recorder).with_default_limit(10)); - tracing::subscriber::with_default(sub, || { - for _ in 0..21 { - info!(message = "Hello world!"); - MockClock::advance(Duration::from_millis(100)); - } - }); - - let events = events.lock().unwrap(); - - assert_eq!( - *events, - vec![ - "Hello world!", - "Internal log [Hello world!] is being suppressed to avoid flooding.", - ] - .into_iter() - .map(std::borrow::ToOwned::to_owned) - .collect::>() - ); - } - #[test] fn override_rate_limit_at_callsite() { let events: Arc>> = Default::default(); @@ -559,7 +534,11 @@ mod test { .with(RateLimitedLayer::new(recorder).with_default_limit(100)); tracing::subscriber::with_default(sub, || { for _ in 0..21 { - info!(message = "Hello world!", internal_log_rate_secs = 1); + info!( + message = "Hello world!", + internal_log_rate_limit = true, + internal_log_rate_secs = 1 + ); MockClock::advance(Duration::from_millis(100)); } }); diff --git a/lib/vector-buffers/src/internal_events.rs b/lib/vector-buffers/src/internal_events.rs index 30e92d39135e4..659622415847b 100644 --- a/lib/vector-buffers/src/internal_events.rs +++ b/lib/vector-buffers/src/internal_events.rs @@ -177,6 +177,7 @@ impl InternalEvent for BufferReadError { error_code = self.error_code, error_type = error_type::READER_FAILED, stage = "processing", + internal_log_rate_limit = true, ); counter!( "buffer_errors_total", "error_code" => self.error_code, diff --git a/lib/vector-common/src/internal_event/component_events_dropped.rs b/lib/vector-common/src/internal_event/component_events_dropped.rs index f8838911ff8a0..c7d2bd9eedd32 100644 --- a/lib/vector-common/src/internal_event/component_events_dropped.rs +++ b/lib/vector-common/src/internal_event/component_events_dropped.rs @@ -58,6 +58,7 @@ impl InternalEventHandle for DroppedHandle<'_, INTENDED> { intentional = INTENDED, count = data.0, reason = self.reason, + internal_log_rate_limit = true, ); } else { error!( @@ -65,6 +66,7 @@ impl InternalEventHandle for DroppedHandle<'_, INTENDED> { intentional = INTENDED, count = data.0, reason = self.reason, + internal_log_rate_limit = true, ); } self.discarded_events.increment(data.0 as u64); diff --git a/lib/vector-common/src/internal_event/service.rs b/lib/vector-common/src/internal_event/service.rs index d6c912f9d3fbc..20132cc99fc61 100644 --- a/lib/vector-common/src/internal_event/service.rs +++ b/lib/vector-common/src/internal_event/service.rs @@ -14,6 +14,7 @@ impl InternalEvent for PollReadyError { error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -44,7 +45,7 @@ impl InternalEvent for CallError { request_id = self.request_id, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/app.rs b/src/app.rs index e9b0cc8dff7df..c5c69a8209a9c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -615,7 +615,10 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) }; trace::init(color, json, &level, rate); - debug!(message = "Internal log rate limit configured.",); + debug!( + message = "Internal log rate limit configured.", + internal_log_rate_secs = rate, + ); info!(message = "Log level is enabled.", level = ?level); } diff --git a/src/cli.rs b/src/cli.rs index 93e1dfea4d6bd..279560154585c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -178,7 +178,6 @@ pub struct RootOpts { pub watch_config_poll_interval_seconds: NonZeroU64, /// Set the internal log rate limit - /// Note that traces are throttled by default unless tagged with `internal_log_rate_limit = false`. #[arg( short, long, diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index 677ec605ea234..c66a3d9087016 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -36,7 +36,7 @@ pub mod source { error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -58,7 +58,7 @@ pub mod source { error = ?self.error, error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -80,7 +80,7 @@ pub mod source { error = ?self.error, error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/apache_metrics.rs b/src/internal_events/apache_metrics.rs index 849741e44dd41..79ae7dced79b4 100644 --- a/src/internal_events/apache_metrics.rs +++ b/src/internal_events/apache_metrics.rs @@ -46,7 +46,7 @@ impl InternalEvent for ApacheMetricsParseError<'_> { stage = error_stage::PROCESSING, error_type = error_type::PARSER_FAILED, endpoint = %self.endpoint, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/aws_cloudwatch_logs.rs b/src/internal_events/aws_cloudwatch_logs.rs index d8c4838dab7e1..d4bfe0328a53b 100644 --- a/src/internal_events/aws_cloudwatch_logs.rs +++ b/src/internal_events/aws_cloudwatch_logs.rs @@ -18,6 +18,7 @@ impl InternalEvent for AwsCloudwatchLogsMessageSizeError { error_code = "message_too_long", error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/aws_ec2_metadata.rs b/src/internal_events/aws_ec2_metadata.rs index 3eafadf20222d..3e0996a3c9f60 100644 --- a/src/internal_events/aws_ec2_metadata.rs +++ b/src/internal_events/aws_ec2_metadata.rs @@ -24,7 +24,7 @@ impl InternalEvent for AwsEc2MetadataRefreshError { error = %self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/aws_ecs_metrics.rs b/src/internal_events/aws_ecs_metrics.rs index 969c5c33aa2e6..1a2f7133094ce 100644 --- a/src/internal_events/aws_ecs_metrics.rs +++ b/src/internal_events/aws_ecs_metrics.rs @@ -50,12 +50,12 @@ impl InternalEvent for AwsEcsMetricsParseError<'_> { error = ?self.error, stage = error_stage::PROCESSING, error_type = error_type::PARSER_FAILED, - + internal_log_rate_limit = true, ); debug!( message = %format!("Failed to parse response:\\n\\n{}\\n\\n", self.body.escape_debug()), endpoint = %self.endpoint, - + internal_log_rate_limit = true, ); counter!("parse_errors_total").increment(1); counter!( diff --git a/src/internal_events/aws_kinesis.rs b/src/internal_events/aws_kinesis.rs index fd5f73bb7e11f..332f55d08b25e 100644 --- a/src/internal_events/aws_kinesis.rs +++ b/src/internal_events/aws_kinesis.rs @@ -17,7 +17,7 @@ impl InternalEvent for AwsKinesisStreamNoPartitionKeyError<'_> { partition_key_field = %self.partition_key_field, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( diff --git a/src/internal_events/aws_kinesis_firehose.rs b/src/internal_events/aws_kinesis_firehose.rs index 317435696b799..d98d5ed5fa1c4 100644 --- a/src/internal_events/aws_kinesis_firehose.rs +++ b/src/internal_events/aws_kinesis_firehose.rs @@ -48,6 +48,7 @@ impl InternalEvent for AwsKinesisFirehoseRequestError<'_> { error_type = error_type::REQUEST_FAILED, error_code = %self.error_code, request_id = %self.request_id.unwrap_or(""), + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -74,7 +75,7 @@ impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError { error_type = error_type::PARSER_FAILED, error_code = %io_error_code(&self.error), compression = %self.compression, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/aws_sqs.rs b/src/internal_events/aws_sqs.rs index 37f4ea98d25ec..490a65f004190 100644 --- a/src/internal_events/aws_sqs.rs +++ b/src/internal_events/aws_sqs.rs @@ -30,7 +30,7 @@ mod s3 { error_code = "failed_processing_sqs_message", error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -74,7 +74,7 @@ mod s3 { error_code = "failed_deleting_some_sqs_messages", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -104,7 +104,7 @@ mod s3 { error_code = "failed_deleting_all_sqs_messages", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -204,7 +204,7 @@ impl InternalEvent for SqsMessageReceiveError<'_, E> { error_code = "failed_fetching_sqs_events", error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -257,7 +257,7 @@ impl InternalEvent for SqsMessageDeleteError<'_, E> { error = %self.error, error_type = error_type::WRITER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/batch.rs b/src/internal_events/batch.rs index 5afc99d47f1bf..816afa4398d5f 100644 --- a/src/internal_events/batch.rs +++ b/src/internal_events/batch.rs @@ -17,7 +17,7 @@ impl InternalEvent for LargeEventDroppedError { length = %self.length, error_type = error_type::CONDITION_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/codecs.rs b/src/internal_events/codecs.rs index e876db1dab5ab..50251bf089cd3 100644 --- a/src/internal_events/codecs.rs +++ b/src/internal_events/codecs.rs @@ -15,7 +15,7 @@ impl InternalEvent for DecoderFramingError { error_code = "decoder_frame", error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -40,7 +40,7 @@ impl InternalEvent for DecoderDeserializeError<'_> { error_code = "decoder_deserialize", error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -66,7 +66,7 @@ impl InternalEvent for EncoderFramingError<'_> { error_code = "encoder_frame", error_type = error_type::ENCODER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -93,7 +93,7 @@ impl InternalEvent for EncoderSerializeError<'_> { error_code = "encoder_serialize", error_type = error_type::ENCODER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -120,7 +120,7 @@ impl InternalEvent for EncoderWriteError<'_, E> { error = %self.error, error_type = error_type::IO_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/common.rs b/src/internal_events/common.rs index f342710daedb2..78971840d7dba 100644 --- a/src/internal_events/common.rs +++ b/src/internal_events/common.rs @@ -66,7 +66,7 @@ impl InternalEvent for SocketOutgoingConnectionError { error_code = "failed_connecting", error_type = error_type::CONNECTION_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -92,6 +92,7 @@ impl InternalEvent for StreamClosedError { error_code = STREAM_CLOSED, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -136,7 +137,7 @@ impl InternalEvent for SinkRequestBuildError { error = %self.error, error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/conditions.rs b/src/internal_events/conditions.rs index 0f7cb70962b08..bf825213c6719 100644 --- a/src/internal_events/conditions.rs +++ b/src/internal_events/conditions.rs @@ -14,6 +14,7 @@ impl InternalEvent for VrlConditionExecutionError<'_> { error = %self.error, error_type = error_type::SCRIPT_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/datadog_agent.rs b/src/internal_events/datadog_agent.rs index 686e3e0e3179c..02f017bc076bb 100644 --- a/src/internal_events/datadog_agent.rs +++ b/src/internal_events/datadog_agent.rs @@ -15,7 +15,7 @@ impl InternalEvent for DatadogAgentJsonParseError<'_> { error = ?self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/datadog_metrics.rs b/src/internal_events/datadog_metrics.rs index 03812c2c81def..8875b29281ac7 100644 --- a/src/internal_events/datadog_metrics.rs +++ b/src/internal_events/datadog_metrics.rs @@ -17,6 +17,7 @@ impl InternalEvent for DatadogMetricsEncodingError<'_> { error_type = error_type::ENCODER_FAILED, intentional = "false", stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/datadog_traces.rs b/src/internal_events/datadog_traces.rs index aee946528391b..dd1df213cdf06 100644 --- a/src/internal_events/datadog_traces.rs +++ b/src/internal_events/datadog_traces.rs @@ -18,7 +18,7 @@ impl InternalEvent for DatadogTracesEncodingError { error_reason = %self.error_reason, error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -48,7 +48,7 @@ impl InternalEvent for DatadogTracesAPMStatsError { error = %self.error, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/dnstap.rs b/src/internal_events/dnstap.rs index adf9ea8688d6c..231a6527830d7 100644 --- a/src/internal_events/dnstap.rs +++ b/src/internal_events/dnstap.rs @@ -14,7 +14,7 @@ impl InternalEvent for DnstapParseError { error = %self.error, stage = error_stage::PROCESSING, error_type = error_type::PARSER_FAILED, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs index f1daaa6c2d4f4..81950b1fa947a 100644 --- a/src/internal_events/docker_logs.rs +++ b/src/internal_events/docker_logs.rs @@ -170,7 +170,7 @@ impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> { error_type = error_type::CONFIGURATION_FAILED, stage = error_stage::RECEIVING, container_id = ?self.container_id, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/encoding_transcode.rs b/src/internal_events/encoding_transcode.rs index 869423fb98241..0a377f52ca1ee 100644 --- a/src/internal_events/encoding_transcode.rs +++ b/src/internal_events/encoding_transcode.rs @@ -27,6 +27,7 @@ impl InternalEvent for DecoderMalformedReplacement { warn!( message = "Replaced malformed sequences with replacement character while decoding to utf8.", from_encoding = %self.from_encoding, + internal_log_rate_limit = true ); // NOT the actual number of replacements in the output: there's no easy // way to get that from the lib we use here (encoding_rs) diff --git a/src/internal_events/eventstoredb_metrics.rs b/src/internal_events/eventstoredb_metrics.rs index 7ee7f36cf9f50..b5d7fadd088dc 100644 --- a/src/internal_events/eventstoredb_metrics.rs +++ b/src/internal_events/eventstoredb_metrics.rs @@ -14,7 +14,7 @@ impl InternalEvent for EventStoreDbMetricsHttpError { error = ?self.error, stage = error_stage::RECEIVING, error_type = error_type::REQUEST_FAILED, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -37,7 +37,7 @@ impl InternalEvent for EventStoreDbStatsParsingError { error = ?self.error, stage = error_stage::PROCESSING, error_type = error_type::PARSER_FAILED, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/exec.rs b/src/internal_events/exec.rs index 1cb656cd493c6..fea1f498b725f 100644 --- a/src/internal_events/exec.rs +++ b/src/internal_events/exec.rs @@ -53,7 +53,7 @@ impl InternalEvent for ExecFailedError<'_> { error_type = error_type::COMMAND_FAILED, error_code = %io_error_code(&self.error), stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -82,7 +82,7 @@ impl InternalEvent for ExecTimeoutError<'_> { error = %self.error, error_type = error_type::TIMED_OUT, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -118,7 +118,7 @@ impl InternalEvent for ExecCommandExecuted<'_> { command = %self.command, exit_status = %exit_status, elapsed_millis = %self.exec_duration.as_millis(), - + internal_log_rate_limit = true, ); counter!( "command_executed_total", @@ -194,7 +194,7 @@ impl InternalEvent for ExecFailedToSignalChildError<'_> { error_code = %self.error.to_error_code(), error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -216,6 +216,7 @@ impl InternalEvent for ExecChannelClosedError { message = exec_reason, error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index c6d80460143f2..4697c93c89ca7 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -83,6 +83,7 @@ impl InternalEvent for FileIoError<'_, P> { error_code = %self.code, error_type = error_type::IO_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -223,7 +224,7 @@ mod source { error_code = "reading_fingerprint", error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); if self.include_file_metric_tag { counter!( @@ -263,6 +264,7 @@ mod source { error_code = DELETION_FAILED, error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, + internal_log_rate_limit = true, ); if self.include_file_metric_tag { counter!( @@ -355,7 +357,7 @@ mod source { error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, file = %self.file.display(), - + internal_log_rate_limit = true, ); if self.include_file_metric_tag { counter!( @@ -457,7 +459,7 @@ mod source { error_code = "writing_checkpoints", error_type = error_type::WRITER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -484,6 +486,7 @@ mod source { error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, path = %self.path.display(), + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/fluent.rs b/src/internal_events/fluent.rs index aa0d26610b72a..3493db9cd762d 100644 --- a/src/internal_events/fluent.rs +++ b/src/internal_events/fluent.rs @@ -30,7 +30,7 @@ impl InternalEvent for FluentMessageDecodeError<'_> { base64_encoded_message = %self.base64_encoded_message, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/gcp_pubsub.rs b/src/internal_events/gcp_pubsub.rs index be3f274e8d225..1f6a7b5253514 100644 --- a/src/internal_events/gcp_pubsub.rs +++ b/src/internal_events/gcp_pubsub.rs @@ -14,7 +14,7 @@ impl InternalEvent for GcpPubsubConnectError { error_code = "failed_connecting", error_type = error_type::CONNECTION_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( @@ -39,7 +39,7 @@ impl InternalEvent for GcpPubsubStreamingPullError { error_code = "failed_streaming_pull", error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( @@ -64,7 +64,7 @@ impl InternalEvent for GcpPubsubReceiveError { error_code = "failed_fetching_events", error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( diff --git a/src/internal_events/host_metrics.rs b/src/internal_events/host_metrics.rs index 56d5649159dfb..3352e529edf0a 100644 --- a/src/internal_events/host_metrics.rs +++ b/src/internal_events/host_metrics.rs @@ -13,6 +13,7 @@ impl InternalEvent for HostMetricsScrapeError { message = self.message, error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, + internal_log_rate_limit = true, ); counter!( @@ -37,7 +38,7 @@ impl InternalEvent for HostMetricsScrapeDetailError { error = %self.error, error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( @@ -64,7 +65,7 @@ impl InternalEvent for HostMetricsScrapeFilesystemError { error = %self.error, error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 5a1e818dcc484..b2b3bfec4ad95 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -127,7 +127,7 @@ impl InternalEvent for HttpBadRequest<'_> { error_type = error_type::REQUEST_FAILED, error_stage = error_stage::RECEIVING, http_code = %self.code, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/http_client.rs b/src/internal_events/http_client.rs index 4aabe421d083c..a35d290395be3 100644 --- a/src/internal_events/http_client.rs +++ b/src/internal_events/http_client.rs @@ -86,7 +86,7 @@ impl InternalEvent for GotHttpWarning<'_> { error = %self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!("http_client_errors_total", "error_kind" => self.error.to_string()).increment(1); histogram!("http_client_rtt_seconds").record(self.roundtrip); diff --git a/src/internal_events/http_client_source.rs b/src/internal_events/http_client_source.rs index ad1b989d3969a..376adf35eda56 100644 --- a/src/internal_events/http_client_source.rs +++ b/src/internal_events/http_client_source.rs @@ -49,7 +49,7 @@ impl InternalEvent for HttpClientHttpResponseError { stage = error_stage::RECEIVING, error_type = error_type::REQUEST_FAILED, error_code = %http_error_code(self.code.as_u16()), - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -76,7 +76,7 @@ impl InternalEvent for HttpClientHttpError { error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/influxdb.rs b/src/internal_events/influxdb.rs index d487353b8dc68..007f5565f1a8f 100644 --- a/src/internal_events/influxdb.rs +++ b/src/internal_events/influxdb.rs @@ -16,6 +16,7 @@ impl InternalEvent for InfluxdbEncodingError { error = %self.error_message, error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/journald.rs b/src/internal_events/journald.rs index 9f7e9a694f9ca..f467137186baa 100644 --- a/src/internal_events/journald.rs +++ b/src/internal_events/journald.rs @@ -17,7 +17,7 @@ impl InternalEvent for JournaldInvalidRecordError { text = %self.text, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -40,7 +40,7 @@ impl InternalEvent for JournaldStartJournalctlError { error = %self.error, error_type = error_type::COMMAND_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -63,7 +63,7 @@ impl InternalEvent for JournaldReadError { error = %self.error, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -88,6 +88,7 @@ impl InternalEvent for JournaldCheckpointSetError { error = %self.error, error_type = error_type::IO_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -112,7 +113,7 @@ impl InternalEvent for JournaldCheckpointFileOpenError { error = %self.error, error_type = error_type::IO_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/kafka.rs b/src/internal_events/kafka.rs index 05e06ea5161ff..ea1b198be7053 100644 --- a/src/internal_events/kafka.rs +++ b/src/internal_events/kafka.rs @@ -78,7 +78,7 @@ impl InternalEvent for KafkaOffsetUpdateError { error_code = "kafka_offset_update", error_type = error_type::READER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -103,7 +103,7 @@ impl InternalEvent for KafkaReadError { error_code = "reading_message", error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -163,6 +163,7 @@ impl InternalEvent for KafkaHeaderExtractionError<'_> { error_type = error_type::PARSER_FAILED, stage = error_stage::RECEIVING, header_field = self.header_field.to_string(), + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index ed68c14790656..39d27649cef5e 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -71,7 +71,7 @@ impl InternalEvent for KubernetesLogsEventAnnotationError<'_> { error_code = ANNOTATION_FAILED, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -96,7 +96,7 @@ impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> { error_code = ANNOTATION_FAILED, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -122,7 +122,7 @@ impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> { error_code = ANNOTATION_FAILED, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -162,7 +162,7 @@ impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> { error = %self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -191,7 +191,7 @@ impl InternalEvent for KubernetesLifecycleError { error_code = KUBERNETES_LIFECYCLE, error_type = error_type::READER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/logplex.rs b/src/internal_events/logplex.rs index b08df7c624d4f..160947fb0d147 100644 --- a/src/internal_events/logplex.rs +++ b/src/internal_events/logplex.rs @@ -36,7 +36,7 @@ impl InternalEvent for HerokuLogplexRequestReadError { error_type = error_type::READER_FAILED, error_code = io_error_code(&self.error), stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/loki.rs b/src/internal_events/loki.rs index 6a5e7567b65b2..2c95d524c655f 100644 --- a/src/internal_events/loki.rs +++ b/src/internal_events/loki.rs @@ -12,6 +12,7 @@ impl InternalEvent for LokiEventUnlabeledError { error_code = "unlabeled_event", error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( @@ -38,6 +39,7 @@ impl InternalEvent for LokiOutOfOrderEventDroppedError { error_code = "out_of_order", error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); emit!(ComponentEventsDropped:: { @@ -66,6 +68,7 @@ impl InternalEvent for LokiOutOfOrderEventRewritten { message = "Timestamps rewritten.", count = self.count, reason = "out_of_order", + internal_log_rate_limit = true, ); counter!("rewritten_timestamp_events_total").increment(self.count as u64); } @@ -83,6 +86,7 @@ impl InternalEvent for LokiTimestampNonParsableEventsDropped { error_code = "non-parsable_timestamp", error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); emit!(ComponentEventsDropped:: { count: 1, reason }); diff --git a/src/internal_events/lua.rs b/src/internal_events/lua.rs index 392f1b8b26199..2805b5b57917d 100644 --- a/src/internal_events/lua.rs +++ b/src/internal_events/lua.rs @@ -28,6 +28,7 @@ impl InternalEvent for LuaScriptError { error_code = mlua_error_code(&self.error), error_type = error_type::COMMAND_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -57,7 +58,7 @@ impl InternalEvent for LuaBuildError { error_type = error_type::SCRIPT_FAILED, error_code = lua_build_error_code(&self.error), stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/metric_to_log.rs b/src/internal_events/metric_to_log.rs index 5bbceeb5017c6..79a9d117054a8 100644 --- a/src/internal_events/metric_to_log.rs +++ b/src/internal_events/metric_to_log.rs @@ -16,6 +16,7 @@ impl InternalEvent for MetricToLogSerializeError { error = ?self.error, error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true ); counter!( "component_errors_total", diff --git a/src/internal_events/mongodb_metrics.rs b/src/internal_events/mongodb_metrics.rs index cf57b7af94e6a..8d292342d3f0d 100644 --- a/src/internal_events/mongodb_metrics.rs +++ b/src/internal_events/mongodb_metrics.rs @@ -48,7 +48,7 @@ impl InternalEvent for MongoDbMetricsRequestError<'_> { error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -72,7 +72,7 @@ impl InternalEvent for MongoDbMetricsBsonParseError<'_> { error = ?self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/mqtt.rs b/src/internal_events/mqtt.rs index 507f4aea2464c..d20ccad0bad98 100644 --- a/src/internal_events/mqtt.rs +++ b/src/internal_events/mqtt.rs @@ -18,7 +18,7 @@ impl InternalEvent for MqttConnectionError { error_code = "mqtt_connection_error", error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/nginx_metrics.rs b/src/internal_events/nginx_metrics.rs index d6561ccf21096..02f3309aec3a1 100644 --- a/src/internal_events/nginx_metrics.rs +++ b/src/internal_events/nginx_metrics.rs @@ -48,7 +48,7 @@ impl InternalEvent for NginxMetricsRequestError<'_> { error = %self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -73,7 +73,7 @@ impl InternalEvent for NginxMetricsStubStatusParseError<'_> { error = %self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/parser.rs b/src/internal_events/parser.rs index 3638ab4189e7b..84cd0556b4821 100644 --- a/src/internal_events/parser.rs +++ b/src/internal_events/parser.rs @@ -30,6 +30,7 @@ impl InternalEvent for ParserMatchError<'_> { error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, field = &truncate_string_at(&String::from_utf8_lossy(self.value), 60)[..], + internal_log_rate_limit = true ); counter!( "component_errors_total", diff --git a/src/internal_events/postgresql_metrics.rs b/src/internal_events/postgresql_metrics.rs index d7979f8dd1185..8843140cb54e7 100644 --- a/src/internal_events/postgresql_metrics.rs +++ b/src/internal_events/postgresql_metrics.rs @@ -16,7 +16,7 @@ impl InternalEvent for PostgresqlMetricsCollectError<'_> { error_type = error_type::REQUEST_FAILED, stage = error_stage::RECEIVING, endpoint = %self.endpoint, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/process.rs b/src/internal_events/process.rs index ef30c1ea9e7e2..5a54154eca849 100644 --- a/src/internal_events/process.rs +++ b/src/internal_events/process.rs @@ -73,6 +73,7 @@ impl InternalEvent for VectorReloadError { error_code = "reload", error_type = error_type::CONFIGURATION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -94,6 +95,7 @@ impl InternalEvent for VectorConfigLoadError { error_code = "config_load", error_type = error_type::CONFIGURATION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -115,6 +117,7 @@ impl InternalEvent for VectorRecoveryError { error_code = "recovery", error_type = error_type::CONFIGURATION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/prometheus.rs b/src/internal_events/prometheus.rs index 2285ee1323773..752e8e238fac4 100644 --- a/src/internal_events/prometheus.rs +++ b/src/internal_events/prometheus.rs @@ -24,7 +24,7 @@ impl InternalEvent for PrometheusParseError<'_> { error = ?self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); debug!( message = %format!("Failed to parse response:\n\n{}\n\n", self.body), @@ -53,7 +53,7 @@ impl InternalEvent for PrometheusRemoteWriteParseError { error = ?self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -74,6 +74,7 @@ impl InternalEvent for PrometheusNormalizationError { message = normalization_reason, error_type = error_type::CONVERSION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/pulsar.rs b/src/internal_events/pulsar.rs index 416768fcff6a5..c03dea3870086 100644 --- a/src/internal_events/pulsar.rs +++ b/src/internal_events/pulsar.rs @@ -19,7 +19,7 @@ impl InternalEvent for PulsarSendingError { error = %self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -46,7 +46,7 @@ impl InternalEvent for PulsarPropertyExtractionError { error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, property_field = %self.property_field, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -105,7 +105,7 @@ registered_event!( error_code = "reading_message", error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); self.read_errors.increment(1_u64); @@ -117,7 +117,7 @@ registered_event!( error_code = "acknowledge_message", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); self.ack_errors.increment(1_u64); @@ -129,7 +129,7 @@ registered_event!( error_code = "negative_acknowledge_message", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::RECEIVING, - + internal_log_rate_limit = true, ); self.nack_errors.increment(1_u64); diff --git a/src/internal_events/redis.rs b/src/internal_events/redis.rs index 32af514f4215d..0ac9e9e59d417 100644 --- a/src/internal_events/redis.rs +++ b/src/internal_events/redis.rs @@ -23,7 +23,7 @@ impl InternalEvent for RedisReceiveEventError { error_code = %self.error_code, error_type = error_type::READER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index c62c1ed3639fa..b9887aaf54bb7 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -26,7 +26,7 @@ impl InternalEvent for ReduceAddEventError { error = ?self.error, error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/remap.rs b/src/internal_events/remap.rs index 93193941b23fe..d05e6dfb9c874 100644 --- a/src/internal_events/remap.rs +++ b/src/internal_events/remap.rs @@ -19,7 +19,7 @@ impl InternalEvent for RemapMappingError { error = ?self.error, error_type = error_type::CONVERSION_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/sematext_metrics.rs b/src/internal_events/sematext_metrics.rs index 67839e8c90e79..b4b5be737fb2e 100644 --- a/src/internal_events/sematext_metrics.rs +++ b/src/internal_events/sematext_metrics.rs @@ -19,7 +19,7 @@ impl InternalEvent for SematextMetricsInvalidMetricError<'_> { stage = error_stage::PROCESSING, value = ?self.metric.value(), kind = ?self.metric.kind(), - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -46,7 +46,7 @@ impl InternalEvent for SematextMetricsEncodeEventError error = %self.error, error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/socket.rs b/src/internal_events/socket.rs index 389adb0f478af..5f2f7aac301e4 100644 --- a/src/internal_events/socket.rs +++ b/src/internal_events/socket.rs @@ -124,7 +124,7 @@ impl InternalEvent for SocketBindError { error_type = error_type::IO_FAILED, stage = error_stage::INITIALIZING, %mode, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -191,7 +191,7 @@ impl InternalEvent for SocketReceiveError { error_type = error_type::READER_FAILED, stage = error_stage::RECEIVING, %mode, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -221,7 +221,7 @@ impl InternalEvent for SocketSendError { error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, %mode, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs index e4853c1a8b40e..4ff5838cdc0b7 100644 --- a/src/internal_events/splunk_hec.rs +++ b/src/internal_events/splunk_hec.rs @@ -33,7 +33,7 @@ mod sink { error_code = "serializing_json", error_type = error_type::ENCODER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -62,7 +62,7 @@ mod sink { stage = error_stage::PROCESSING, value = ?self.value, kind = ?self.kind, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -92,7 +92,7 @@ mod sink { error_code = "invalid_response", error_type = error_type::PARSER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -118,7 +118,7 @@ mod sink { error_code = "indexer_ack_failed", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -143,7 +143,7 @@ mod sink { error_code = "indexer_ack_unavailable", error_type = error_type::ACKNOWLEDGMENT_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/statsd_sink.rs b/src/internal_events/statsd_sink.rs index b7ee84cfbc457..165ef885eabbd 100644 --- a/src/internal_events/statsd_sink.rs +++ b/src/internal_events/statsd_sink.rs @@ -20,7 +20,7 @@ impl InternalEvent for StatsdInvalidMetricError<'_> { stage = error_stage::PROCESSING, value = ?self.value, kind = ?self.kind, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/tag_cardinality_limit.rs b/src/internal_events/tag_cardinality_limit.rs index e0a79e583bbcc..4c0c315eb11c1 100644 --- a/src/internal_events/tag_cardinality_limit.rs +++ b/src/internal_events/tag_cardinality_limit.rs @@ -14,6 +14,7 @@ impl InternalEvent for TagCardinalityLimitRejectingEvent<'_> { metric_name = self.metric_name, tag_key = self.tag_key, tag_value = self.tag_value, + internal_log_rate_limit = true, ); counter!("tag_value_limit_exceeded_total").increment(1); @@ -37,6 +38,7 @@ impl InternalEvent for TagCardinalityLimitRejectingTag<'_> { metric_name = self.metric_name, tag_key = self.tag_key, tag_value = self.tag_value, + internal_log_rate_limit = true, ); counter!("tag_value_limit_exceeded_total").increment(1); } diff --git a/src/internal_events/tcp.rs b/src/internal_events/tcp.rs index ae050b99be309..23ec1fc88e830 100644 --- a/src/internal_events/tcp.rs +++ b/src/internal_events/tcp.rs @@ -60,6 +60,7 @@ impl InternalEvent for TcpSocketError<'_, E> { peer_addr = ?self.peer_addr, error_type = error_type::CONNECTION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -88,6 +89,7 @@ impl InternalEvent for TcpSocketTlsConnectionError { debug!( message = "Connection error, probably a healthcheck.", error = %self.error, + internal_log_rate_limit = true, ); } _ => { @@ -97,6 +99,7 @@ impl InternalEvent for TcpSocketTlsConnectionError { error_code = "connection_failed", error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -124,6 +127,7 @@ impl InternalEvent for TcpSendAckError { error_code = "ack_failed", error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/template.rs b/src/internal_events/template.rs index 610acf9e7a166..6ecc62b859363 100644 --- a/src/internal_events/template.rs +++ b/src/internal_events/template.rs @@ -23,7 +23,7 @@ impl InternalEvent for TemplateRenderingError<'_> { error = %self.error, error_type = error_type::TEMPLATE_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( @@ -43,7 +43,7 @@ impl InternalEvent for TemplateRenderingError<'_> { error = %self.error, error_type = error_type::TEMPLATE_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); } } diff --git a/src/internal_events/udp.rs b/src/internal_events/udp.rs index b983543e6cae1..15e9df4c2b07b 100644 --- a/src/internal_events/udp.rs +++ b/src/internal_events/udp.rs @@ -47,6 +47,7 @@ impl InternalEvent for UdpSendIncompleteError { dropped = self.data_size - self.sent, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/unix.rs b/src/internal_events/unix.rs index f66327011da98..a45777ab8f2e1 100644 --- a/src/internal_events/unix.rs +++ b/src/internal_events/unix.rs @@ -54,7 +54,7 @@ impl InternalEvent for UnixSocketError<'_, E> { path = ?self.path, error_type = error_type::CONNECTION_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -80,7 +80,7 @@ impl InternalEvent for UnixSocketSendError<'_, E> { path = ?self.path, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -109,6 +109,7 @@ impl InternalEvent for UnixSendIncompleteError { dropped = self.data_size - self.sent, error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -136,7 +137,7 @@ impl InternalEvent for UnixSocketFileDeleteError<'_> { error_code = "delete_socket_file", error_type = error_type::WRITER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/websocket.rs b/src/internal_events/websocket.rs index 20c8b3091b09b..f647cd5cdfaf9 100644 --- a/src/internal_events/websocket.rs +++ b/src/internal_events/websocket.rs @@ -39,7 +39,7 @@ impl InternalEvent for WebSocketConnectionFailedError { error_code = "websocket_connection_error", error_type = error_type::CONNECTION_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -82,7 +82,7 @@ impl InternalEvent for WebSocketConnectionError { error_code = "websocket_connection_error", error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/websocket_server.rs b/src/internal_events/websocket_server.rs index b3209c70a9994..b928385e669b9 100644 --- a/src/internal_events/websocket_server.rs +++ b/src/internal_events/websocket_server.rs @@ -43,7 +43,7 @@ impl InternalEvent for WebSocketListenerConnectionFailedError { error_code = "ws_connection_error", error_type = error_type::CONNECTION_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); let mut all_tags = self.extra_tags.clone(); all_tags.extend([ @@ -100,7 +100,7 @@ impl InternalEvent for WebSocketListenerSendError { error_code = "ws_server_connection_error", error_type = error_type::WRITER_FAILED, stage = error_stage::SENDING, - + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/internal_events/windows.rs b/src/internal_events/windows.rs index f44c0a26bc8f3..994b89345e3c0 100644 --- a/src/internal_events/windows.rs +++ b/src/internal_events/windows.rs @@ -102,6 +102,7 @@ impl InternalEvent for WindowsServiceDoesNotExistError<'_> { error_code = "service_missing", error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index cff96f92fd3dc..79d873f2044c5 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -200,6 +200,7 @@ pub fn position_reserved_attr_event_root( message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.", meaning = meaning, renamed = &rename_attr, + internal_log_rate_limit = true, ); log.rename_key(desired_path, rename_path); } diff --git a/src/sinks/datadog/traces/apm_stats/flusher.rs b/src/sinks/datadog/traces/apm_stats/flusher.rs index e529f49869dbd..d1ee1e73e949b 100644 --- a/src/sinks/datadog/traces/apm_stats/flusher.rs +++ b/src/sinks/datadog/traces/apm_stats/flusher.rs @@ -70,6 +70,7 @@ pub async fn flush_apm_stats_thread( } Err(_) => { error!( + internal_log_rate_limit = true, message = "Tokio Sender unexpectedly dropped." ); break; diff --git a/src/sinks/gcp/stackdriver/logs/encoder.rs b/src/sinks/gcp/stackdriver/logs/encoder.rs index 82fa672f1d58e..76359ca5fdd7c 100644 --- a/src/sinks/gcp/stackdriver/logs/encoder.rs +++ b/src/sinks/gcp/stackdriver/logs/encoder.rs @@ -173,6 +173,7 @@ pub(super) fn remap_severity(severity: Value) -> Value { warn!( message = "Unknown severity value string, using DEFAULT.", value = %s, + internal_log_rate_limit = true ); 0 } diff --git a/src/sinks/util/adaptive_concurrency/controller.rs b/src/sinks/util/adaptive_concurrency/controller.rs index 0f05639a92ed0..ed7fe472cf3f6 100644 --- a/src/sinks/util/adaptive_concurrency/controller.rs +++ b/src/sinks/util/adaptive_concurrency/controller.rs @@ -293,6 +293,7 @@ where warn!( message = "Unhandled error response.", %error, + internal_log_rate_limit = true ); false } diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index 9daaec01ca5b4..cdb5310d9b361 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -152,6 +152,7 @@ where error!( message = "OK/retry response but retries exhausted; dropping the request.", reason = ?reason, + internal_log_rate_limit = true, ); return None; } @@ -197,7 +198,7 @@ where error!( message = "Non-retriable error; dropping the request.", %error, - + internal_log_rate_limit = true, ); None } @@ -211,6 +212,7 @@ where error!( message = "Unexpected error type; dropping the request.", %error, + internal_log_rate_limit = true ); None } diff --git a/src/sources/datadog_agent/metrics.rs b/src/sources/datadog_agent/metrics.rs index 7e5df463a16c2..4cac3d47a3026 100644 --- a/src/sources/datadog_agent/metrics.rs +++ b/src/sources/datadog_agent/metrics.rs @@ -182,7 +182,10 @@ fn decode_datadog_sketches( ) -> Result, ErrorMessage> { if body.is_empty() { // The datadog agent may send an empty payload as a keep alive - debug!(message = "Empty payload ignored.",); + debug!( + message = "Empty payload ignored.", + internal_log_rate_limit = true + ); return Ok(Vec::new()); } diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 8417ce2dad7d4..c2daf516746de 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -407,6 +407,7 @@ fn line_to_events( warn!( message = "Line didn't match expected logplex format, so raw message is forwarded.", fields = parts.len(), + internal_log_rate_limit = true ); events.push(LogEvent::from_str_legacy(line).into()) diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 24ea971752eed..c60d32d5abb66 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -231,6 +231,7 @@ pub(super) fn udp( warn!( message = "Discarding frame larger than max_length.", max_length = max_length, + internal_log_rate_limit = true ); continue; } @@ -258,6 +259,7 @@ pub(super) fn udp( warn!( message = "Discarding frame larger than max_length.", max_length = max_length, + internal_log_rate_limit = true ); } diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index f196d645dd9a8..8f8817bedb5c8 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -255,6 +255,7 @@ impl mlua::UserData for LuaEvent { message = "Could not set field to Lua value of invalid type, dropping field.", field = key.as_str(), + internal_log_rate_limit = true ); this.inner.as_mut_log().remove(&key_path); } From a7dc891d305f08190e0cd9f83a6d924b05cd3eaa Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 8 Aug 2025 16:08:59 -0400 Subject: [PATCH 2/6] added internal_log_rate_limit = true to new traces --- src/config/watcher.rs | 11 ++++++----- src/internal_events/websocket.rs | 2 ++ src/sinks/amqp/channel.rs | 3 ++- src/sources/http_client/client.rs | 6 +++--- src/sources/util/framestream.rs | 2 +- src/sources/websocket/source.rs | 19 ++++++++++--------- src/topology/builder.rs | 5 +++-- 7 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 8e643ba557ceb..54a1b454aea40 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -122,25 +122,26 @@ pub fn spawn_thread<'a>( if !changed_components.is_empty() { info!( "Component {:?} configuration changed.", - changed_components.keys() + changed_components.keys(), + internal_log_rate_limit = true ); if changed_components .iter() .all(|(_, t)| *t == ComponentType::EnrichmentTable) { - info!("Only enrichment tables have changed.",); + info!("Only enrichment tables have changed.", internal_log_rate_limit = true); _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| { - error!(message = "Unable to reload enrichment tables.", cause = %error) + error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true,) }); } else { _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| { - error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error) + error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true,) }); } } else { _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk) .map_err(|error| { - error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true) }); } } else { diff --git a/src/internal_events/websocket.rs b/src/internal_events/websocket.rs index f647cd5cdfaf9..3dac157e31472 100644 --- a/src/internal_events/websocket.rs +++ b/src/internal_events/websocket.rs @@ -199,6 +199,7 @@ impl InternalEvent for WebSocketReceiveError<'_> { error_code = "websocket_receive_error", error_type = error_type::CONNECTION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", @@ -228,6 +229,7 @@ impl InternalEvent for WebSocketSendError<'_> { error_code = "websocket_send_error", error_type = error_type::CONNECTION_FAILED, stage = error_stage::PROCESSING, + internal_log_rate_limit = true, ); counter!( "component_errors_total", diff --git a/src/sinks/amqp/channel.rs b/src/sinks/amqp/channel.rs index 862ff06e5d0dc..6bb8f6f47c475 100644 --- a/src/sinks/amqp/channel.rs +++ b/src/sinks/amqp/channel.rs @@ -41,7 +41,8 @@ impl deadpool::managed::Manager for AmqpSinkChannelManager { let channel = Self::new_channel(&self.config).await?; info!( message = "Created a new channel to the AMQP broker.", - id = channel.id() + id = channel.id(), + internal_log_rate_limit = true, ); Ok(channel) } diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 2fde06fc6b88a..85b792c9ef545 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -251,7 +251,7 @@ impl Query { let warnings = Formatter::new(param.value(), compilation_result.warnings) .colored() .to_string(); - warn!(message = "VRL compilation warnings.", %warnings); + warn!(message = "VRL compilation warnings.", %warnings, internal_log_rate_limit = true); } Some(compilation_result.program) } @@ -259,7 +259,7 @@ impl Query { let error = Formatter::new(param.value(), diagnostics) .colored() .to_string(); - warn!(message = "VRL compilation failed.", %error); + warn!(message = "VRL compilation failed.", %error, internal_log_rate_limit = true); None } } @@ -428,7 +428,7 @@ fn resolve_vrl(value: &str, program: &Program) -> Option { Runtime::default() .resolve(&mut target, program, &timezone) .map_err(|error| { - warn!(message = "VRL runtime error.", source = %value, %error); + warn!(message = "VRL runtime error.", source = %value, %error, internal_log_rate_limit = true); }) .ok() .and_then(|vrl_value| { diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 2daee6f70f359..a060c37d06c5d 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -678,7 +678,7 @@ async fn handle_tcp_frame( .await; } else if let Some(event) = frame_handler.handle_event(received_from, frame) { if let Err(e) = event_sink.send_event(event).await { - error!("Error sending event: {e:?}."); + error!("Error sending event: {e:?}.", internal_log_rate_limit = true); } } } diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index 6a1c98e2b1c57..b9316a1a363ee 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -94,7 +94,7 @@ impl WebSocketSource { loop { let result = tokio::select! { _ = cx.shutdown.clone() => { - info!("Received shutdown signal."); + info!("Received shutdown signal.", internal_log_rate_limit = true); break; }, @@ -117,16 +117,17 @@ impl WebSocketSource { warn!( message = "Connection closed by server.", code = %frame.code, - reason = %frame.reason + reason = %frame.reason, + internal_log_rate_limit = true ); emit!(WebSocketConnectionShutdown); } WebSocketSourceError::RemoteClosedEmpty => { - warn!("Connection closed by server without a close frame."); + warn!("Connection closed by server without a close frame.", internal_log_rate_limit = true); emit!(WebSocketConnectionShutdown); } WebSocketSourceError::PongTimeout => { - error!("Disconnecting due to pong timeout."); + error!("Disconnecting due to pong timeout.", internal_log_rate_limit = true); emit!(WebSocketReceiveError { error: &TungsteniteError::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -140,7 +141,7 @@ impl WebSocketSource { if is_closed(&ws_err) { emit!(WebSocketConnectionShutdown); } - error!(message = "WebSocket connection error.", error = %ws_err); + error!(message = "WebSocket connection error.", error = %ws_err, internal_log_rate_limit = true); } // These errors should only happen during `connect` or `reconnect`, // not in the main loop's result. @@ -194,7 +195,7 @@ impl WebSocketSource { Message::Ping(_) => Ok(()), Message::Close(frame) => self.handle_close_frame(frame), Message::Frame(_) => { - warn!("Unsupported message type received: frame."); + warn!("Unsupported message type received: frame.", internal_log_rate_limit = true); Ok(()) } } @@ -241,7 +242,7 @@ impl WebSocketSource { }); if let Err(error) = out.send_batch(events_with_meta).await { - error!(message = "Error sending events.", %error); + error!(message = "Error sending events.", %error, internal_log_rate_limit = true); } } Err(error) => { @@ -265,14 +266,14 @@ impl WebSocketSource { ws_sink: &mut WebSocketSink, ws_source: &mut WebSocketStream, ) -> Result<(), WebSocketSourceError> { - info!("Reconnecting to WebSocket..."); + info!("Reconnecting to WebSocket...", internal_log_rate_limit = true); let (new_sink, new_source) = self.connect(out).await?; *ws_sink = new_sink; *ws_source = new_source; - info!("Reconnected."); + info!("Reconnected to Websocket.", internal_log_rate_limit = true); Ok(()) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 454b901399ee1..48ba7d7438319 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -191,7 +191,8 @@ impl<'a> Builder<'a> { // Just report the error and continue. error!(message = "Unable to add index to reloaded enrichment table.", table = ?name.to_string(), - %error); + %error, + internal_log_rate_limit = true); continue 'tables; } } @@ -723,7 +724,7 @@ pub async fn reload_enrichment_tables(config: &Config) { let mut table = match table_outer.inner.build(&config.global).await { Ok(table) => table, Err(error) => { - error!("Enrichment table \"{name}\" reload failed: {error}"); + error!("Enrichment table \"{name}\" reload failed: {error}", internal_log_rate_limit = true); continue; } }; From 7d71a3a5b7a08546c21929df7c6b4be00a39f8ae Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 8 Aug 2025 16:13:42 -0400 Subject: [PATCH 3/6] cargo fmt --- src/config/watcher.rs | 5 ++++- src/sources/util/framestream.rs | 5 ++++- src/sources/websocket/source.rs | 20 ++++++++++++++++---- src/topology/builder.rs | 5 ++++- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 54a1b454aea40..8b63f3826e13c 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -129,7 +129,10 @@ pub fn spawn_thread<'a>( .iter() .all(|(_, t)| *t == ComponentType::EnrichmentTable) { - info!("Only enrichment tables have changed.", internal_log_rate_limit = true); + info!( + "Only enrichment tables have changed.", + internal_log_rate_limit = true + ); _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| { error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true,) }); diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index a060c37d06c5d..48a5a21b43680 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -678,7 +678,10 @@ async fn handle_tcp_frame( .await; } else if let Some(event) = frame_handler.handle_event(received_from, frame) { if let Err(e) = event_sink.send_event(event).await { - error!("Error sending event: {e:?}.", internal_log_rate_limit = true); + error!( + "Error sending event: {e:?}.", + internal_log_rate_limit = true + ); } } } diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index b9316a1a363ee..a6c27e0d80bf6 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -123,11 +123,17 @@ impl WebSocketSource { emit!(WebSocketConnectionShutdown); } WebSocketSourceError::RemoteClosedEmpty => { - warn!("Connection closed by server without a close frame.", internal_log_rate_limit = true); + warn!( + "Connection closed by server without a close frame.", + internal_log_rate_limit = true + ); emit!(WebSocketConnectionShutdown); } WebSocketSourceError::PongTimeout => { - error!("Disconnecting due to pong timeout.", internal_log_rate_limit = true); + error!( + "Disconnecting due to pong timeout.", + internal_log_rate_limit = true + ); emit!(WebSocketReceiveError { error: &TungsteniteError::Io(std::io::Error::new( std::io::ErrorKind::TimedOut, @@ -195,7 +201,10 @@ impl WebSocketSource { Message::Ping(_) => Ok(()), Message::Close(frame) => self.handle_close_frame(frame), Message::Frame(_) => { - warn!("Unsupported message type received: frame.", internal_log_rate_limit = true); + warn!( + "Unsupported message type received: frame.", + internal_log_rate_limit = true + ); Ok(()) } } @@ -266,7 +275,10 @@ impl WebSocketSource { ws_sink: &mut WebSocketSink, ws_source: &mut WebSocketStream, ) -> Result<(), WebSocketSourceError> { - info!("Reconnecting to WebSocket...", internal_log_rate_limit = true); + info!( + "Reconnecting to WebSocket...", + internal_log_rate_limit = true + ); let (new_sink, new_source) = self.connect(out).await?; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 48ba7d7438319..852fd4de69b8e 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -724,7 +724,10 @@ pub async fn reload_enrichment_tables(config: &Config) { let mut table = match table_outer.inner.build(&config.global).await { Ok(table) => table, Err(error) => { - error!("Enrichment table \"{name}\" reload failed: {error}", internal_log_rate_limit = true); + error!( + "Enrichment table \"{name}\" reload failed: {error}", + internal_log_rate_limit = true + ); continue; } }; From ec991de0080097a5f1c80650abebeae33379db16 Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 8 Aug 2025 16:37:52 -0400 Subject: [PATCH 4/6] Fix compilation errors and remaning missed logs --- src/components/validation/resources/http.rs | 4 ++-- src/config/watcher.rs | 12 ++++++------ src/internal_events/expansion.rs | 4 ++-- src/sinks/redis/sink.rs | 7 +++++-- src/sources/util/framestream.rs | 9 ++++++--- src/sources/websocket/source.rs | 20 ++++++++++---------- src/topology/builder.rs | 11 +++++++---- 7 files changed, 38 insertions(+), 29 deletions(-) diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 59c4504654ce9..0be7d4de4bd3c 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -336,11 +336,11 @@ impl HttpResourceOutputContext<'_> { // entire payload which may contain multiple frames and their delimiters. Ok(Some((events, decoded_byte_size))) => { if should_reject { - info!("HTTP server external output resource decoded {decoded_byte_size} bytes but test case configured to reject."); + info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.", ); } else { let mut output_runner_metrics = output_runner_metrics.lock().await; - info!("HTTP server external output resource decoded {decoded_byte_size} bytes."); + info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes."); // Update the runner metrics for the received events. This will later // be used in the Validators, as the "expected" case. diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 8b63f3826e13c..641c29343121f 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -121,24 +121,24 @@ pub fn spawn_thread<'a>( info!("Configuration file changed."); if !changed_components.is_empty() { info!( + internal_log_rate_limit = true, "Component {:?} configuration changed.", - changed_components.keys(), - internal_log_rate_limit = true + changed_components.keys() ); if changed_components .iter() .all(|(_, t)| *t == ComponentType::EnrichmentTable) { info!( - "Only enrichment tables have changed.", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Only enrichment tables have changed." ); _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| { - error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true,) + error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true) }); } else { _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| { - error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true,) + error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true) }); } } else { diff --git a/src/internal_events/expansion.rs b/src/internal_events/expansion.rs index 4077d4c25f8ab..62a96afca1068 100644 --- a/src/internal_events/expansion.rs +++ b/src/internal_events/expansion.rs @@ -19,7 +19,7 @@ impl InternalEvent for PairExpansionError<'_> { error = %self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); counter!( @@ -39,7 +39,7 @@ impl InternalEvent for PairExpansionError<'_> { error = %self.error, error_type = error_type::PARSER_FAILED, stage = error_stage::PROCESSING, - + internal_log_rate_limit = true, ); } } diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index aaea351fece3e..0f17d769268b2 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -166,7 +166,10 @@ impl RedisConnection { if !repairing { // Wait until a repair is needed if let Err(error) = conn_recv.wait_for(|state| state.needs_repair()).await { - warn!("Connection state channel was dropped {error:?}."); + warn!( + internal_log_rate_limit = true, + "Connection state channel was dropped {error:?}." + ); continue; } @@ -189,7 +192,7 @@ impl RedisConnection { } } Err(error) => { - warn!("Failed to repair ConnectionManager via sentinel (gen: {current_generation}): {error:?}."); + warn!(internal_log_rate_limit = true, "Failed to repair ConnectionManager via sentinel (gen: {current_generation}): {error:?}."); sleep(Duration::from_millis(250)).await; continue; } diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 48a5a21b43680..a864d42d193a5 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -679,8 +679,8 @@ async fn handle_tcp_frame( } else if let Some(event) = frame_handler.handle_event(received_from, frame) { if let Err(e) = event_sink.send_event(event).await { error!( - "Error sending event: {e:?}.", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Error sending event: {e:?}." ); } } @@ -867,7 +867,10 @@ fn build_framestream_source( let handler = async move { if let Err(e) = event_sink.send_event_stream(&mut events).await { - error!("Error sending event: {:?}.", e); + error!( + internal_log_rate_limit = true, + "Error sending event: {:?}.", e + ); } info!("Finished sending."); diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index a6c27e0d80bf6..c54f4cb717365 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -94,7 +94,7 @@ impl WebSocketSource { loop { let result = tokio::select! { _ = cx.shutdown.clone() => { - info!("Received shutdown signal.", internal_log_rate_limit = true); + info!(internal_log_rate_limit = true, "Received shutdown signal."); break; }, @@ -124,15 +124,15 @@ impl WebSocketSource { } WebSocketSourceError::RemoteClosedEmpty => { warn!( - "Connection closed by server without a close frame.", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Connection closed by server without a close frame." ); emit!(WebSocketConnectionShutdown); } WebSocketSourceError::PongTimeout => { error!( - "Disconnecting due to pong timeout.", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Disconnecting due to pong timeout." ); emit!(WebSocketReceiveError { error: &TungsteniteError::Io(std::io::Error::new( @@ -202,8 +202,8 @@ impl WebSocketSource { Message::Close(frame) => self.handle_close_frame(frame), Message::Frame(_) => { warn!( - "Unsupported message type received: frame.", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Unsupported message type received: frame." ); Ok(()) } @@ -276,8 +276,8 @@ impl WebSocketSource { ws_source: &mut WebSocketStream, ) -> Result<(), WebSocketSourceError> { info!( - "Reconnecting to WebSocket...", - internal_log_rate_limit = true + internal_log_rate_limit = true, + "Reconnecting to WebSocket..." ); let (new_sink, new_source) = self.connect(out).await?; @@ -285,7 +285,7 @@ impl WebSocketSource { *ws_sink = new_sink; *ws_source = new_source; - info!("Reconnected to Websocket.", internal_log_rate_limit = true); + info!(internal_log_rate_limit = true, "Reconnected to Websocket."); Ok(()) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 852fd4de69b8e..a872435e968e4 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -725,8 +725,8 @@ pub async fn reload_enrichment_tables(config: &Config) { Ok(table) => table, Err(error) => { error!( + internal_log_rate_limit = true, "Enrichment table \"{name}\" reload failed: {error}", - internal_log_rate_limit = true ); continue; } @@ -742,9 +742,12 @@ pub async fn reload_enrichment_tables(config: &Config) { // If there is an error adding an index we do not want to use the reloaded // data, the previously loaded data will still need to be used. // Just report the error and continue. - error!(message = "Unable to add index to reloaded enrichment table.", - table = ?name.to_string(), - %error); + error!( + internal_log_rate_limit = true, + message = "Unable to add index to reloaded enrichment table.", + table = ?name.to_string(), + %error + ); continue 'tables; } } From 405b6c20abcbed9304fcd2356b5c610d68a1df9a Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 11 Aug 2025 09:34:11 -0400 Subject: [PATCH 5/6] Checkout file from dffcb5a~1 --- lib/tracing-limit/src/lib.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index 44fb9848de417..bd5952bf39760 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -389,7 +389,7 @@ impl Visit for RateLimitedSpanKeys { } fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { - self.record(field, format!("{value:?}").into()); + self.record(field, format!("{:?}", value).into()); } } @@ -437,7 +437,7 @@ impl Visit for MessageVisitor { fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { if self.message.is_none() && field.name() == MESSAGE_FIELD { - self.message = Some(format!("{value:?}")); + self.message = Some(format!("{:?}", value)); } } } @@ -577,7 +577,9 @@ mod test { info_span!("span", component_id = &key, vrl_position = &line_number); let _enter = span.enter(); info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), + message = + format!("Hello {} on line_number {}!", key, line_number).as_str(), + internal_log_rate_limit = true ); } } @@ -637,7 +639,9 @@ mod test { for key in &["foo", "bar"] { for line_number in &[1, 2] { info!( - message = format!("Hello {key} on line_number {line_number}!").as_str(), + message = + format!("Hello {} on line_number {}!", key, line_number).as_str(), + internal_log_rate_limit = true, component_id = &key, vrl_position = &line_number ); From 9b3eca6d2e296170b5fafa0c79bc21f23fe72f68 Mon Sep 17 00:00:00 2001 From: Thomas Date: Mon, 11 Aug 2025 09:43:39 -0400 Subject: [PATCH 6/6] Fix clippy --- lib/tracing-limit/src/lib.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/tracing-limit/src/lib.rs b/lib/tracing-limit/src/lib.rs index bd5952bf39760..7367f8c8c7612 100644 --- a/lib/tracing-limit/src/lib.rs +++ b/lib/tracing-limit/src/lib.rs @@ -389,7 +389,7 @@ impl Visit for RateLimitedSpanKeys { } fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { - self.record(field, format!("{:?}", value).into()); + self.record(field, format!("{value:?}").into()); } } @@ -437,7 +437,7 @@ impl Visit for MessageVisitor { fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { if self.message.is_none() && field.name() == MESSAGE_FIELD { - self.message = Some(format!("{:?}", value)); + self.message = Some(format!("{value:?}")); } } } @@ -577,8 +577,7 @@ mod test { info_span!("span", component_id = &key, vrl_position = &line_number); let _enter = span.enter(); info!( - message = - format!("Hello {} on line_number {}!", key, line_number).as_str(), + message = format!("Hello {key} on line_number {line_number}!").as_str(), internal_log_rate_limit = true ); } @@ -639,8 +638,7 @@ mod test { for key in &["foo", "bar"] { for line_number in &[1, 2] { info!( - message = - format!("Hello {} on line_number {}!", key, line_number).as_str(), + message = format!("Hello {key} on line_number {line_number}!").as_str(), internal_log_rate_limit = true, component_id = &key, vrl_position = &line_number