diff --git a/src/sinks/aws_cloudwatch_logs/retry.rs b/src/sinks/aws_cloudwatch_logs/retry.rs index 86689f4a89178..b2722054de097 100644 --- a/src/sinks/aws_cloudwatch_logs/retry.rs +++ b/src/sinks/aws_cloudwatch_logs/retry.rs @@ -9,28 +9,34 @@ use crate::aws::is_retriable_error; use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic}; #[derive(Debug)] -pub struct CloudwatchRetryLogic { - phantom: PhantomData, +pub struct CloudwatchRetryLogic { + request: PhantomData, + response: PhantomData, } -impl CloudwatchRetryLogic { - pub const fn new() -> CloudwatchRetryLogic { +impl CloudwatchRetryLogic { + pub const fn new() -> CloudwatchRetryLogic { CloudwatchRetryLogic { - phantom: PhantomData, + request: PhantomData, + response: PhantomData, } } } -impl Clone for CloudwatchRetryLogic { +impl Clone for CloudwatchRetryLogic { fn clone(&self) -> Self { CloudwatchRetryLogic { - phantom: PhantomData, + request: PhantomData, + response: PhantomData, } } } -impl RetryLogic for CloudwatchRetryLogic { +impl RetryLogic + for CloudwatchRetryLogic +{ type Error = CloudwatchError; - type Response = T; + type Request = Request; + type Response = Response; // TODO this match may not be necessary given the logic in `is_retriable_error()` #[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit @@ -84,7 +90,7 @@ mod test { #[test] fn test_throttle_retry() { - let retry_logic: CloudwatchRetryLogic<()> = CloudwatchRetryLogic::new(); + let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new(); let meta_err = aws_smithy_types::error::ErrorMetadata::builder() .code("ThrottlingException") diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index 52f50678bbd35..4546aa42b4b71 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -50,7 +50,7 @@ type Svc = Buffer< >, + FibonacciRetryPolicy, ()>>, Buffer< Vec, as Service>>::Future, diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index c7954d5ffa4dd..d8f7e35aac1c6 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -201,6 +201,7 @@ struct CloudWatchMetricsRetryLogic; impl RetryLogic for CloudWatchMetricsRetryLogic { type Error = SdkError; + type Request = PartitionInnerBuffer, String>; type Response = (); fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index ce12349698dc1..259129a014a4e 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -99,7 +99,7 @@ where R: Send + 'static, RR: Record + Record + Clone + Send + Sync + Unpin + 'static, E: Send + 'static, - RT: RetryLogic + Default, + RT: RetryLogic, Response = KinesisResponse> + Default, { let request_limits = config.request.into_settings(); diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index ad6323ea21f33..85ebb6034aa54 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -16,6 +16,7 @@ use crate::{ }, }; +use super::sink::BatchKinesisRequest; use super::{ build_sink, record::{KinesisFirehoseClient, KinesisFirehoseRecord}, @@ -173,6 +174,7 @@ struct KinesisRetryLogic { impl RetryLogic for KinesisRetryLogic { type Error = SdkError; + type Request = BatchKinesisRequest; type Response = KinesisResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { @@ -187,7 +189,7 @@ impl RetryLogic for KinesisRetryLogic { is_retriable_error(error) } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 5c97f0b019b04..2f42865fe8df8 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -15,6 +15,7 @@ use crate::{ }, }; +use super::sink::BatchKinesisRequest; use super::{ build_sink, record::{KinesisStreamClient, KinesisStreamRecord}, @@ -170,6 +171,7 @@ struct KinesisRetryLogic { impl RetryLogic for KinesisRetryLogic { type Error = SdkError; + type Request = BatchKinesisRequest; type Response = KinesisResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { @@ -190,7 +192,7 @@ impl RetryLogic for KinesisRetryLogic { is_retriable_error(error) } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { if response.failure_count > 0 && self.retry_partial { let msg = format!("partial error count {}", response.failure_count); RetryAction::Retry(msg.into()) diff --git a/src/sinks/aws_s_s/retry.rs b/src/sinks/aws_s_s/retry.rs index 569c9a083969a..bc9aba3eb58f8 100644 --- a/src/sinks/aws_s_s/retry.rs +++ b/src/sinks/aws_s_s/retry.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError}; -use super::service::SendMessageResponse; +use super::{request_builder::SendMessageEntry, service::SendMessageResponse}; use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic}; #[derive(Debug)] @@ -26,6 +26,7 @@ where E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static, { type Error = SdkError; + type Request = SendMessageEntry; type Response = SendMessageResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 5e10ea797c305..ca6b1c7dd87fc 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -57,6 +57,7 @@ pub struct AzureBlobRetryLogic; impl RetryLogic for AzureBlobRetryLogic { type Error = HttpError; + type Request = AzureBlobRequest; type Response = AzureBlobResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/clickhouse/service.rs b/src/sinks/clickhouse/service.rs index aa45017a4caef..ca865fa0cf4fd 100644 --- a/src/sinks/clickhouse/service.rs +++ b/src/sinks/clickhouse/service.rs @@ -23,18 +23,19 @@ use snafu::ResultExt; #[derive(Debug, Default, Clone)] pub struct ClickhouseRetryLogic { - inner: HttpRetryLogic, + inner: HttpRetryLogic>, } impl RetryLogic for ClickhouseRetryLogic { type Error = HttpError; + type Request = HttpRequest; type Response = HttpResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { self.inner.is_retriable_error(error) } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { match response.http_response.status() { StatusCode::INTERNAL_SERVER_ERROR => { let body = response.http_response.body(); diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index 6e30172ba5a7d..e86f484d89024 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -83,7 +83,7 @@ where /// PartitionKey used to partition events by (database, table) pair. #[derive(Hash, Eq, PartialEq, Clone, Debug)] -pub(super) struct PartitionKey { +pub struct PartitionKey { pub database: String, pub table: String, pub format: Format, diff --git a/src/sinks/databend/service.rs b/src/sinks/databend/service.rs index 88a504e3e6ec7..2069771088fd4 100644 --- a/src/sinks/databend/service.rs +++ b/src/sinks/databend/service.rs @@ -23,6 +23,7 @@ pub struct DatabendRetryLogic; impl RetryLogic for DatabendRetryLogic { type Error = DatabendError; + type Request = DatabendRequest; type Response = DatabendResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/datadog/logs/service.rs b/src/sinks/datadog/logs/service.rs index 4ae96ecb2c51e..0375708d97ad1 100644 --- a/src/sinks/datadog/logs/service.rs +++ b/src/sinks/datadog/logs/service.rs @@ -31,6 +31,7 @@ pub struct LogApiRetry; impl RetryLogic for LogApiRetry { type Error = DatadogApiError; + type Request = LogApiRequest; type Response = LogApiResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index d4106871d507d..c1d0d6e52fc69 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -26,6 +26,7 @@ pub struct DatadogMetricsRetryLogic; impl RetryLogic for DatadogMetricsRetryLogic { type Error = DatadogApiError; + type Request = DatadogMetricsRequest; type Response = DatadogMetricsResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/datadog/traces/service.rs b/src/sinks/datadog/traces/service.rs index d561263a72fa9..89f6a56a412db 100644 --- a/src/sinks/datadog/traces/service.rs +++ b/src/sinks/datadog/traces/service.rs @@ -23,13 +23,14 @@ pub struct TraceApiRetry; impl RetryLogic for TraceApiRetry { type Error = HttpError; + type Request = TraceApiRequest; type Response = TraceApiResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { let status = response.status_code; match status { // Use the same status code/retry policy as the Trace agent, additionally retrying diff --git a/src/sinks/elasticsearch/retry.rs b/src/sinks/elasticsearch/retry.rs index 284ecb60cc2f4..becdcc62ed194 100644 --- a/src/sinks/elasticsearch/retry.rs +++ b/src/sinks/elasticsearch/retry.rs @@ -7,7 +7,7 @@ use crate::{ http::HttpError, sinks::{ elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse}, - util::retries::{RetryAction, RetryLogic, RetryPartialFunction}, + util::retries::{RetryAction, RetryLogic}, }, }; use http::StatusCode; @@ -92,32 +92,19 @@ pub struct ElasticsearchRetryLogic { pub retry_partial: bool, } -// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) } -struct EsRetryClosure { - closure: Box ElasticsearchRequest + Send + Sync>, -} - -impl RetryPartialFunction for EsRetryClosure { - fn modify_request(&self, request: Box) -> Box { - match request.downcast::() { - Ok(request) => { - let new_request = (self.closure)(*request); - Box::new(new_request) - } - Err(request) => request, - } - } -} - impl RetryLogic for ElasticsearchRetryLogic { type Error = HttpError; + type Request = ElasticsearchRequest; type Response = ElasticsearchResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true } - fn should_retry_response(&self, response: &ElasticsearchResponse) -> RetryAction { + fn should_retry_response( + &self, + response: &ElasticsearchResponse, + ) -> RetryAction { let status = response.http_response.status(); match status { @@ -160,8 +147,8 @@ impl RetryLogic for ElasticsearchRetryLogic { || status.is_server_error() }) { - return RetryAction::RetryPartial(Box::new(EsRetryClosure { - closure: Box::new(move |req: ElasticsearchRequest| { + return RetryAction::RetryPartial(Box::new( + move |req: ElasticsearchRequest| { let mut failed_events: Vec = req .original_events .clone() @@ -196,8 +183,8 @@ impl RetryLogic for ElasticsearchRetryLogic { elasticsearch_request_builder: req .elasticsearch_request_builder, } - }), - })); + }, + )); } } diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 933419d638746..4004565fb2e31 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -276,7 +276,7 @@ impl GcsSinkConfig { let protocol = get_http_scheme_from_uri(&base_url.parse::().unwrap()); let svc = ServiceBuilder::new() - .settings(request, GcsRetryLogic) + .settings(request, GcsRetryLogic::default()) .service(GcsService::new(client, base_url, auth)); let request_settings = RequestSettings::new(self, cx)?; diff --git a/src/sinks/gcp_chronicle/chronicle_unstructured.rs b/src/sinks/gcp_chronicle/chronicle_unstructured.rs index 8727e789c137b..dcd775c87f9f3 100644 --- a/src/sinks/gcp_chronicle/chronicle_unstructured.rs +++ b/src/sinks/gcp_chronicle/chronicle_unstructured.rs @@ -347,7 +347,7 @@ impl ChronicleUnstructuredConfig { let partitioner = self.partitioner()?; let svc = ServiceBuilder::new() - .settings(request, GcsRetryLogic) + .settings(request, GcsRetryLogic::default()) .service(ChronicleService::new(client, base_url, creds)); let request_settings = ChronicleRequestBuilder::new(self)?; diff --git a/src/sinks/gcs_common/config.rs b/src/sinks/gcs_common/config.rs index 7c88c3c0990f5..65ee3794cc6ad 100644 --- a/src/sinks/gcs_common/config.rs +++ b/src/sinks/gcs_common/config.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; + use futures::FutureExt; use http::{StatusCode, Uri}; use hyper::Body; @@ -138,19 +140,37 @@ pub fn healthcheck_response( } } -#[derive(Clone)] -pub struct GcsRetryLogic; +pub struct GcsRetryLogic { + request: PhantomData, +} + +impl Default for GcsRetryLogic { + fn default() -> Self { + Self { + request: PhantomData, + } + } +} + +impl Clone for GcsRetryLogic { + fn clone(&self) -> Self { + Self { + request: PhantomData, + } + } +} // This is a clone of HttpRetryLogic for the Body type, should get merged -impl RetryLogic for GcsRetryLogic { +impl RetryLogic for GcsRetryLogic { type Error = hyper::Error; + type Request = Request; type Response = GcsResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { let status = response.inner.status(); match status { diff --git a/src/sinks/greptimedb/logs/http_request_builder.rs b/src/sinks/greptimedb/logs/http_request_builder.rs index 8f888ec4c0dd7..62672e56faf56 100644 --- a/src/sinks/greptimedb/logs/http_request_builder.rs +++ b/src/sinks/greptimedb/logs/http_request_builder.rs @@ -239,18 +239,19 @@ pub(super) async fn http_healthcheck( /// GreptimeDB HTTP retry logic. #[derive(Clone, Default)] pub(super) struct GreptimeDBHttpRetryLogic { - inner: HttpRetryLogic, + inner: HttpRetryLogic>, } impl RetryLogic for GreptimeDBHttpRetryLogic { type Error = HttpError; + type Request = HttpRequest; type Response = HttpResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { error.is_retriable() } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { self.inner.should_retry_response(&response.http_response) } } diff --git a/src/sinks/greptimedb/metrics/request.rs b/src/sinks/greptimedb/metrics/request.rs index 1a7cd8556b6eb..082c6c6709b54 100644 --- a/src/sinks/greptimedb/metrics/request.rs +++ b/src/sinks/greptimedb/metrics/request.rs @@ -13,7 +13,7 @@ use vector_lib::event::Metric; /// that is used to send metrics to GreptimeDB. /// It also contains the finalizers and metadata that are used to #[derive(Clone)] -pub(super) struct GreptimeDBGrpcRequest { +pub struct GreptimeDBGrpcRequest { pub(super) items: RowInsertRequests, pub(super) finalizers: EventFinalizers, pub(super) metadata: RequestMetadata, @@ -90,8 +90,9 @@ impl DriverResponse for GreptimeDBGrpcBatchOutput { pub struct GreptimeDBGrpcRetryLogic; impl RetryLogic for GreptimeDBGrpcRetryLogic { - type Response = GreptimeDBGrpcBatchOutput; type Error = GreptimeError; + type Request = GreptimeDBGrpcRequest; + type Response = GreptimeDBGrpcBatchOutput; fn is_retriable_error(&self, error: &Self::Error) -> bool { error.is_retriable() diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index fd21c1025d2fe..81ad81f4fc1ea 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -167,7 +167,7 @@ impl InfluxDbSvc { let sink = request .batch_sink( - HttpRetryLogic, + HttpRetryLogic::default(), influxdb_http_service, MetricsBuffer::new(batch.size), batch.timeout, diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index 2a4c33280732f..3fbbf7deb988f 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -15,6 +15,7 @@ pub struct LokiRetryLogic; impl RetryLogic for LokiRetryLogic { type Error = LokiError; + type Request = LokiRequest; type Response = LokiResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index bfec91a999ae7..bb58b58b9192b 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -6,7 +6,7 @@ use crate::sinks::prelude::*; use super::{ config::{NatsHeaderConfig, NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults}, - request_builder::{NatsEncoder, NatsRequestBuilder}, + request_builder::{NatsEncoder, NatsRequest, NatsRequestBuilder}, service::{NatsResponse, NatsService}, EncodingSnafu, NatsError, }; @@ -109,6 +109,7 @@ pub(super) struct NatsRetryLogic; impl RetryLogic for NatsRetryLogic { type Error = NatsError; + type Request = NatsRequest; type Response = NatsResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { diff --git a/src/sinks/new_relic/config.rs b/src/sinks/new_relic/config.rs index d77dacf34dafa..bc7a25e141388 100644 --- a/src/sinks/new_relic/config.rs +++ b/src/sinks/new_relic/config.rs @@ -5,8 +5,8 @@ use tower::ServiceBuilder; use vector_lib::sensitive_string::SensitiveString; use super::{ - healthcheck, NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, - NewRelicSinkError, + healthcheck, service::NewRelicApiRequest, NewRelicApiResponse, NewRelicApiService, + NewRelicEncoder, NewRelicSink, NewRelicSinkError, }; use crate::{http::HttpClient, sinks::prelude::*}; @@ -56,6 +56,7 @@ pub struct NewRelicApiRetry; impl RetryLogic for NewRelicApiRetry { type Error = NewRelicSinkError; + type Request = NewRelicApiRequest; type Response = NewRelicApiResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { diff --git a/src/sinks/postgres/service.rs b/src/sinks/postgres/service.rs index d1edcdce16505..54389d6a66c88 100644 --- a/src/sinks/postgres/service.rs +++ b/src/sinks/postgres/service.rs @@ -21,6 +21,7 @@ pub struct PostgresRetryLogic; impl RetryLogic for PostgresRetryLogic { type Error = PostgresServiceError; + type Request = PostgresRequest; type Response = PostgresResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index 1df2c37794714..aaea351fece3e 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -18,7 +18,7 @@ use super::{ config::{DataTypeConfig, RedisSinkConfig, RedisTowerRequestConfigDefaults}, request_builder::request_builder, service::{RedisResponse, RedisService}, - RedisEvent, RepairChannelSnafu, + RedisEvent, RedisRequest, RepairChannelSnafu, }; pub(super) type GenerationCount = u64; @@ -377,6 +377,7 @@ pub(super) struct RedisRetryLogic { impl RetryLogic for RedisRetryLogic { type Error = RedisSinkError; + type Request = RedisRequest; type Response = RedisResponse; fn is_retriable_error(&self, _error: &Self::Error) -> bool { @@ -396,7 +397,7 @@ impl RetryLogic for RedisRetryLogic { } } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { if response.is_successful() { RetryAction::Successful } else { diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index f4dd5cce7a365..3d3caac50fc0d 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -13,7 +13,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_lib::configurable::configurable_component; -use super::service::{S3Response, S3Service}; +use super::service::{S3Request, S3Response, S3Service}; use crate::{ aws::{create_client, is_retriable_error, AwsAuthentication, RegionOrEndpoint}, common::s3::S3ClientBuilder, @@ -360,6 +360,7 @@ pub enum RetryStrategy { impl RetryLogic for RetryStrategy { type Error = SdkError; + type Request = S3Request; type Response = S3Response; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index 88cc3fa5d0818..64941c3571666 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -176,7 +176,7 @@ impl SematextMetricsService { let sink = request .batch_sink( - HttpRetryLogic, + HttpRetryLogic::default(), sematext_service, MetricsBuffer::new(batch.size), batch.timeout, diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 7479a58d0b2ba..000b26b21de05 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -241,7 +241,7 @@ impl HecLogsSinkConfig { self.compression, )); let http_service = ServiceBuilder::new() - .settings(request_settings, HttpRetryLogic) + .settings(request_settings, HttpRetryLogic::default()) .service(build_http_batch_service( client, Arc::clone(&http_request_builder), diff --git a/src/sinks/splunk_hec/metrics/config.rs b/src/sinks/splunk_hec/metrics/config.rs index 521e88daf5b18..38cb8470d0f90 100644 --- a/src/sinks/splunk_hec/metrics/config.rs +++ b/src/sinks/splunk_hec/metrics/config.rs @@ -185,7 +185,7 @@ impl HecMetricsSinkConfig { self.compression, )); let http_service = ServiceBuilder::new() - .settings(request_settings, HttpRetryLogic) + .settings(request_settings, HttpRetryLogic::default()) .service(build_http_batch_service( client, Arc::clone(&http_request_builder), diff --git a/src/sinks/util/adaptive_concurrency/service.rs b/src/sinks/util/adaptive_concurrency/service.rs index 6aa929f62fe15..5f9861c35ed4a 100644 --- a/src/sinks/util/adaptive_concurrency/service.rs +++ b/src/sinks/util/adaptive_concurrency/service.rs @@ -156,6 +156,7 @@ mod tests { struct TestRetryLogic; impl RetryLogic for TestRetryLogic { type Error = TestError; + type Request = (); type Response = String; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index 0b209aca85c15..e22509b7e73b5 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -321,6 +321,7 @@ enum Error { struct TestRetryLogic; impl RetryLogic for TestRetryLogic { + type Request = Vec; type Response = Response; type Error = Error; diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 163417c667e1e..4bbe767a3074d 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -109,12 +109,14 @@ pub trait HttpSink: Send + Sync + 'static { /// /// Note: This has been deprecated, please do not use when creating new Sinks. #[pin_project] -pub struct BatchedHttpSink +pub struct BatchedHttpSink::Output>> where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, T: HttpSink, - RL: RetryLogic> + Send + 'static, + RL: RetryLogic::Output, Response = http::Response> + + Send + + 'static, { sink: Arc, #[pin] @@ -133,7 +135,7 @@ where impl BatchedHttpSink where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, T: HttpSink, { pub fn new( @@ -146,7 +148,7 @@ where Self::with_logic( sink, batch, - HttpRetryLogic, + HttpRetryLogic::default(), request_settings, batch_timeout, client, @@ -157,8 +159,10 @@ where impl BatchedHttpSink where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, - RL: RetryLogic, Error = HttpError> + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, + RL: RetryLogic, Error = HttpError> + + Send + + 'static, T: HttpSink, { pub fn with_logic( @@ -193,9 +197,11 @@ where impl Sink for BatchedHttpSink where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, T: HttpSink, - RL: RetryLogic> + Send + 'static, + RL: RetryLogic::Output, Response = http::Response> + + Send + + 'static, { type Error = crate::Error; @@ -249,14 +255,14 @@ where /// Note: This has been deprecated, please do not use when creating new Sinks. #[pin_project] -pub struct PartitionHttpSink +pub struct PartitionHttpSink::Output>> where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, B::Input: Partition, K: Hash + Eq + Clone + Send + 'static, T: HttpSink, - RL: RetryLogic> + Send + 'static, + RL: RetryLogic> + Send + 'static, { sink: Arc, #[pin] @@ -270,10 +276,10 @@ where slot: Option>, } -impl PartitionHttpSink +impl PartitionHttpSink::Output>> where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, B::Input: Partition, K: Hash + Eq + Clone + Send + 'static, T: HttpSink, @@ -288,7 +294,7 @@ where Self::with_retry_logic( sink, batch, - HttpRetryLogic, + HttpRetryLogic::default(), request_settings, batch_timeout, client, @@ -299,11 +305,13 @@ where impl PartitionHttpSink where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, B::Input: Partition, K: Hash + Eq + Clone + Send + 'static, T: HttpSink, - RL: RetryLogic, Error = HttpError> + Send + 'static, + RL: RetryLogic, Error = HttpError> + + Send + + 'static, { pub fn with_retry_logic( sink: T, @@ -343,11 +351,11 @@ where impl Sink for PartitionHttpSink where B: Batch, - B::Output: ByteSizeOf + Clone + Send + 'static, + B::Output: ByteSizeOf + Clone + Sync + Send + 'static, B::Input: Partition, K: Hash + Eq + Clone + Send + 'static, T: HttpSink, - RL: RetryLogic> + Send + 'static, + RL: RetryLogic> + Send + 'static, { type Error = crate::Error; @@ -541,18 +549,28 @@ impl sink::Response for http::Response { } } -#[derive(Debug, Default, Clone)] -pub struct HttpRetryLogic; +#[derive(Debug, Clone)] +pub struct HttpRetryLogic { + request: PhantomData, +} +impl Default for HttpRetryLogic { + fn default() -> Self { + Self { + request: PhantomData, + } + } +} -impl RetryLogic for HttpRetryLogic { +impl RetryLogic for HttpRetryLogic { type Error = HttpError; + type Request = Req; type Response = hyper::Response; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true } - fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { let status = response.status(); match status { @@ -573,37 +591,42 @@ impl RetryLogic for HttpRetryLogic { /// A more generic version of `HttpRetryLogic` that accepts anything that can be converted /// to a status code #[derive(Debug)] -pub struct HttpStatusRetryLogic { +pub struct HttpStatusRetryLogic { func: F, - request: PhantomData, + request: PhantomData, + response: PhantomData, } -impl HttpStatusRetryLogic +impl HttpStatusRetryLogic where - F: Fn(&T) -> StatusCode + Clone + Send + Sync + 'static, - T: Send + Sync + 'static, + F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static, + Req: Send + Sync + 'static, + Res: Send + Sync + 'static, { - pub const fn new(func: F) -> HttpStatusRetryLogic { + pub const fn new(func: F) -> HttpStatusRetryLogic { HttpStatusRetryLogic { func, request: PhantomData, + response: PhantomData, } } } -impl RetryLogic for HttpStatusRetryLogic +impl RetryLogic for HttpStatusRetryLogic where - F: Fn(&T) -> StatusCode + Clone + Send + Sync + 'static, - T: Send + Sync + 'static, + F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static, + Req: Send + Sync + 'static, + Res: Send + Sync + 'static, { type Error = HttpError; - type Response = T; + type Request = Req; + type Response = Res; fn is_retriable_error(&self, _error: &Self::Error) -> bool { true } - fn should_retry_response(&self, response: &T) -> RetryAction { + fn should_retry_response(&self, response: &Res) -> RetryAction { let status = (self.func)(response); match status { @@ -621,7 +644,7 @@ where } } -impl Clone for HttpStatusRetryLogic +impl Clone for HttpStatusRetryLogic where F: Clone, { @@ -629,6 +652,7 @@ where Self { func: self.func.clone(), request: PhantomData, + response: PhantomData, } } } @@ -719,7 +743,7 @@ pub fn validate_headers( } /// Request type for use in the `Service` implementation of HTTP stream sinks. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct HttpRequest { payload: Bytes, finalizers: EventFinalizers, @@ -802,8 +826,9 @@ impl DriverResponse for HttpResponse { } /// Creates a `RetryLogic` for use with `HttpResponse`. -pub fn http_response_retry_logic() -> HttpStatusRetryLogic< +pub fn http_response_retry_logic() -> HttpStatusRetryLogic< impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, + Request, HttpResponse, > { HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) @@ -928,7 +953,7 @@ mod test { #[test] fn util_http_retry_logic() { - let logic = HttpRetryLogic; + let logic = HttpRetryLogic::<()>::default(); let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap(); let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap(); diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index e596268290c5d..9daaec01ca5b4 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -14,23 +14,20 @@ use vector_lib::configurable::configurable_component; use crate::Error; -pub enum RetryAction { +pub enum RetryAction { /// Indicate that this request should be retried with a reason Retry(Cow<'static, str>), /// Indicate that a portion of this request should be retried with a generic function - RetryPartial(Box), + RetryPartial(Box Request + Send + Sync>), /// Indicate that this request should not be retried with a reason DontRetry(Cow<'static, str>), /// Indicate that this request should not be retried but the request was successful Successful, } -pub trait RetryPartialFunction { - fn modify_request(&self, request: Box) -> Box; -} - pub trait RetryLogic: Clone + Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; + type Request; type Response; /// When the Service call returns an `Err` response, this function allows @@ -43,7 +40,7 @@ pub trait RetryLogic: Clone + Send + Sync + 'static { /// of a sink returns a transport protocol layer success but error data in the /// response body. For example, an HTTP 200 status, but the body of the response /// contains a list of errors encountered while processing. - fn should_retry_response(&self, _response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, _response: &Self::Response) -> RetryAction { // Treat the default as the request is successful RetryAction::Successful } @@ -140,8 +137,8 @@ impl FibonacciRetryPolicy { impl Policy for FibonacciRetryPolicy where - Req: Clone + 'static, - L: RetryLogic, + Req: Clone + Send + 'static, + L: RetryLogic, { type Future = RetryPolicyFuture; @@ -162,8 +159,7 @@ where warn!(message = "Retrying after response.", reason = %reason, internal_log_rate_limit = true); Some(self.build_retry()) } - - RetryAction::RetryPartial(rebuild_request_fn) => { + RetryAction::RetryPartial(modify_request) => { if self.remaining_attempts == 0 { error!( message = @@ -172,25 +168,13 @@ where ); return None; } - let output = rebuild_request_fn.modify_request(Box::new(req.clone())); - if let Ok(output) = output.downcast::() { - *req = *output; - error!( - message = "OK/retrying partial after response.", - internal_log_rate_limit = true - ); - Some(self.build_retry()) - } else { - // unlikely to go here. - error!( - message = - "OK/retry response but invalid request; dropping the request.", - internal_log_rate_limit = true, - ); - None - } + *req = modify_request(req.clone()); + error!( + message = "OK/retrying partial after response.", + internal_log_rate_limit = true + ); + Some(self.build_retry()) } - RetryAction::DontRetry(reason) => { error!(message = "Not retriable; dropping the request.", reason = ?reason, internal_log_rate_limit = true); None @@ -252,7 +236,7 @@ impl Future for RetryPolicyFuture { } } -impl RetryAction { +impl RetryAction { pub const fn is_retryable(&self) -> bool { matches!(self, RetryAction::Retry(_) | RetryAction::RetryPartial(_)) } @@ -439,6 +423,7 @@ mod tests { impl RetryLogic for SvcRetryLogic { type Error = Error; + type Request = &'static str; type Response = &'static str; fn is_retriable_error(&self, error: &Self::Error) -> bool { diff --git a/src/sinks/util/service.rs b/src/sinks/util/service.rs index ab5160b8741ce..61fa9224a930a 100644 --- a/src/sinks/util/service.rs +++ b/src/sinks/util/service.rs @@ -269,7 +269,7 @@ impl TowerRequestSettings { batch_timeout: Duration, ) -> TowerPartitionSink where - RL: RetryLogic, + RL: RetryLogic::Output, Response = S::Response>, S: Service + Clone + Send + 'static, S::Error: Into + Send + Sync + 'static, S::Response: Send + Response, @@ -294,7 +294,7 @@ impl TowerRequestSettings { batch_timeout: Duration, ) -> TowerBatchedSink where - RL: RetryLogic, + RL: RetryLogic::Output, Response = S::Response>, S: Service + Clone + Send + 'static, S::Error: Into + Send + Sync + 'static, S::Response: Send + Response, @@ -537,7 +537,7 @@ mod tests { let svc = { let sent_requests = Arc::clone(&sent_requests); let delay = Arc::new(AtomicBool::new(true)); - tower::service_fn(move |req: PartitionInnerBuffer<_, _>| { + tower::service_fn(move |req: PartitionInnerBuffer, Vec>| { let (req, _) = req.into_parts(); if delay.swap(false, AcqRel) { // Error on first request @@ -561,7 +561,7 @@ mod tests { ); sink.ordered(); - let input = (0..20).map(|i| PartitionInnerBuffer::new(i, 0)); + let input = (0..20).map(|i| PartitionInnerBuffer::new(i, vec![0])); sink.sink_map_err(drop) .send_all( &mut stream::iter(input) @@ -582,13 +582,14 @@ mod tests { impl RetryLogic for RetryAlways { type Error = std::io::Error; + type Request = PartitionInnerBuffer, Vec>; type Response = (); fn is_retriable_error(&self, _: &Self::Error) -> bool { true } - fn should_retry_response(&self, _response: &Self::Response) -> RetryAction { + fn should_retry_response(&self, _response: &Self::Response) -> RetryAction { // Treat the default as the request is successful RetryAction::Successful } diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 28d857dae42da..d35daeb0dd150 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -7,7 +7,7 @@ use tower::ServiceBuilder; use vector_lib::configurable::configurable_component; use super::{ - service::{VectorResponse, VectorService}, + service::{VectorRequest, VectorResponse, VectorService}, sink::VectorSink, VectorSinkError, }; @@ -220,6 +220,7 @@ struct VectorGrpcRetryLogic; impl RetryLogic for VectorGrpcRetryLogic { type Error = VectorSinkError; + type Request = VectorRequest; type Response = VectorResponse; fn is_retriable_error(&self, err: &Self::Error) -> bool {