Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/sinks/aws_cloudwatch_logs/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,34 @@ use crate::aws::is_retriable_error;
use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic};

#[derive(Debug)]
pub struct CloudwatchRetryLogic<T> {
phantom: PhantomData<T>,
pub struct CloudwatchRetryLogic<Request, Response> {
request: PhantomData<Request>,
response: PhantomData<Response>,
}
impl<T> CloudwatchRetryLogic<T> {
pub const fn new() -> CloudwatchRetryLogic<T> {
impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
CloudwatchRetryLogic {
phantom: PhantomData,
request: PhantomData,
response: PhantomData,
}
}
}

impl<T> Clone for CloudwatchRetryLogic<T> {
impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
fn clone(&self) -> Self {
CloudwatchRetryLogic {
phantom: PhantomData,
request: PhantomData,
response: PhantomData,
}
}
}

impl<T: Send + Sync + 'static> RetryLogic for CloudwatchRetryLogic<T> {
impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
for CloudwatchRetryLogic<Request, Response>
{
type Error = CloudwatchError;
type Response = T;
type Request = Request;
type Response = Response;

// TODO this match may not be necessary given the logic in `is_retriable_error()`
#[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit
Expand Down Expand Up @@ -84,7 +90,7 @@ mod test {

#[test]
fn test_throttle_retry() {
let retry_logic: CloudwatchRetryLogic<()> = CloudwatchRetryLogic::new();
let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();

let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
.code("ThrottlingException")
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Svc = Buffer<
<ConcurrencyLimit<
RateLimit<
Retry<
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
Buffer<
Vec<InputLogEvent>,
<Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct CloudWatchMetricsRetryLogic;

impl RetryLogic for CloudWatchMetricsRetryLogic {
type Error = SdkError<PutMetricDataError>;
type Request = PartitionInnerBuffer<Vec<Metric>, String>;
type Response = ();

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
R: Send + 'static,
RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
E: Send + 'static,
RT: RetryLogic<Response = KinesisResponse> + Default,
RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
{
let request_limits = config.request.into_settings();

Expand Down
4 changes: 3 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
};

use super::sink::BatchKinesisRequest;
use super::{
build_sink,
record::{KinesisFirehoseClient, KinesisFirehoseRecord},
Expand Down Expand Up @@ -173,6 +174,7 @@ struct KinesisRetryLogic {

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError, HttpResponse>;
type Request = BatchKinesisRequest<KinesisFirehoseRecord>;
type Response = KinesisResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand All @@ -187,7 +189,7 @@ impl RetryLogic for KinesisRetryLogic {
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
},
};

use super::sink::BatchKinesisRequest;
use super::{
build_sink,
record::{KinesisStreamClient, KinesisStreamRecord},
Expand Down Expand Up @@ -170,6 +171,7 @@ struct KinesisRetryLogic {

impl RetryLogic for KinesisRetryLogic {
type Error = SdkError<KinesisError, HttpResponse>;
type Request = BatchKinesisRequest<KinesisStreamRecord>;
type Response = KinesisResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand All @@ -190,7 +192,7 @@ impl RetryLogic for KinesisRetryLogic {
is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
if response.failure_count > 0 && self.retry_partial {
let msg = format!("partial error count {}", response.failure_count);
RetryAction::Retry(msg.into())
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_s_s/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::marker::PhantomData;

use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};

use super::service::SendMessageResponse;
use super::{request_builder::SendMessageEntry, service::SendMessageResponse};
use crate::{aws::is_retriable_error, sinks::util::retries::RetryLogic};

#[derive(Debug)]
Expand All @@ -26,6 +26,7 @@ where
E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
{
type Error = SdkError<E, HttpResponse>;
type Request = SendMessageEntry;
type Response = SendMessageResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/azure_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct AzureBlobRetryLogic;

impl RetryLogic for AzureBlobRetryLogic {
type Error = HttpError;
type Request = AzureBlobRequest;
type Response = AzureBlobResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ use snafu::ResultExt;

#[derive(Debug, Default, Clone)]
pub struct ClickhouseRetryLogic {
inner: HttpRetryLogic,
inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
}

impl RetryLogic for ClickhouseRetryLogic {
type Error = HttpError;
type Request = HttpRequest<PartitionKey>;
type Response = HttpResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
self.inner.is_retriable_error(error)
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
match response.http_response.status() {
StatusCode::INTERNAL_SERVER_ERROR => {
let body = response.http_response.body();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/clickhouse/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where

/// PartitionKey used to partition events by (database, table) pair.
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(super) struct PartitionKey {
pub struct PartitionKey {
pub database: String,
pub table: String,
pub format: Format,
Expand Down
1 change: 1 addition & 0 deletions src/sinks/databend/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct DatabendRetryLogic;

impl RetryLogic for DatabendRetryLogic {
type Error = DatabendError;
type Request = DatabendRequest;
type Response = DatabendResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/datadog/logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct LogApiRetry;

impl RetryLogic for LogApiRetry {
type Error = DatadogApiError;
type Request = LogApiRequest;
type Response = LogApiResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sinks/datadog/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct DatadogMetricsRetryLogic;

impl RetryLogic for DatadogMetricsRetryLogic {
type Error = DatadogApiError;
type Request = DatadogMetricsRequest;
type Response = DatadogMetricsResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/datadog/traces/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ pub struct TraceApiRetry;

impl RetryLogic for TraceApiRetry {
type Error = HttpError;
type Request = TraceApiRequest;
type Response = TraceApiResponse;

fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
let status = response.status_code;
match status {
// Use the same status code/retry policy as the Trace agent, additionally retrying
Expand Down
33 changes: 10 additions & 23 deletions src/sinks/elasticsearch/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
http::HttpError,
sinks::{
elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
util::retries::{RetryAction, RetryLogic, RetryPartialFunction},
util::retries::{RetryAction, RetryLogic},
},
};
use http::StatusCode;
Expand Down Expand Up @@ -92,32 +92,19 @@ pub struct ElasticsearchRetryLogic {
pub retry_partial: bool,
}

// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) }
struct EsRetryClosure {
closure: Box<dyn Fn(ElasticsearchRequest) -> ElasticsearchRequest + Send + Sync>,
}

impl RetryPartialFunction for EsRetryClosure {
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any> {
match request.downcast::<ElasticsearchRequest>() {
Ok(request) => {
let new_request = (self.closure)(*request);
Box::new(new_request)
}
Err(request) => request,
}
}
}

impl RetryLogic for ElasticsearchRetryLogic {
type Error = HttpError;
type Request = ElasticsearchRequest;
type Response = ElasticsearchResponse;

fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}

fn should_retry_response(&self, response: &ElasticsearchResponse) -> RetryAction {
fn should_retry_response(
&self,
response: &ElasticsearchResponse,
) -> RetryAction<ElasticsearchRequest> {
let status = response.http_response.status();

match status {
Expand Down Expand Up @@ -160,8 +147,8 @@ impl RetryLogic for ElasticsearchRetryLogic {
|| status.is_server_error()
})
{
return RetryAction::RetryPartial(Box::new(EsRetryClosure {
closure: Box::new(move |req: ElasticsearchRequest| {
return RetryAction::RetryPartial(Box::new(
move |req: ElasticsearchRequest| {
let mut failed_events: Vec<ProcessedEvent> = req
.original_events
.clone()
Expand Down Expand Up @@ -196,8 +183,8 @@ impl RetryLogic for ElasticsearchRetryLogic {
elasticsearch_request_builder: req
.elasticsearch_request_builder,
}
}),
}));
},
));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/gcp/cloud_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl GcsSinkConfig {
let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());

let svc = ServiceBuilder::new()
.settings(request, GcsRetryLogic)
.settings(request, GcsRetryLogic::default())
.service(GcsService::new(client, base_url, auth));

let request_settings = RequestSettings::new(self, cx)?;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/gcp_chronicle/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl ChronicleUnstructuredConfig {
let partitioner = self.partitioner()?;

let svc = ServiceBuilder::new()
.settings(request, GcsRetryLogic)
.settings(request, GcsRetryLogic::default())
.service(ChronicleService::new(client, base_url, creds));

let request_settings = ChronicleRequestBuilder::new(self)?;
Expand Down
28 changes: 24 additions & 4 deletions src/sinks/gcs_common/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::marker::PhantomData;

use futures::FutureExt;
use http::{StatusCode, Uri};
use hyper::Body;
Expand Down Expand Up @@ -138,19 +140,37 @@ pub fn healthcheck_response(
}
}

#[derive(Clone)]
pub struct GcsRetryLogic;
pub struct GcsRetryLogic<Request> {
request: PhantomData<Request>,
}

impl<Request> Default for GcsRetryLogic<Request> {
fn default() -> Self {
Self {
request: PhantomData,
}
}
}

impl<Request> Clone for GcsRetryLogic<Request> {
fn clone(&self) -> Self {
Self {
request: PhantomData,
}
}
}

// This is a clone of HttpRetryLogic for the Body type, should get merged
impl RetryLogic for GcsRetryLogic {
impl<Request: Clone + Send + Sync + 'static> RetryLogic for GcsRetryLogic<Request> {
type Error = hyper::Error;
type Request = Request;
type Response = GcsResponse;

fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
let status = response.inner.status();

match status {
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/greptimedb/logs/http_request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,19 @@ pub(super) async fn http_healthcheck(
/// GreptimeDB HTTP retry logic.
#[derive(Clone, Default)]
pub(super) struct GreptimeDBHttpRetryLogic {
inner: HttpRetryLogic,
inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
}

impl RetryLogic for GreptimeDBHttpRetryLogic {
type Error = HttpError;
type Request = HttpRequest<PartitionKey>;
type Response = HttpResponse;

fn is_retriable_error(&self, error: &Self::Error) -> bool {
error.is_retriable()
}

fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
self.inner.should_retry_response(&response.http_response)
}
}
Expand Down
Loading
Loading