diff --git a/changelog.d/elasticsearch_sink_skip_retrying_succeeded_documents.feature.md b/changelog.d/elasticsearch_sink_skip_retrying_succeeded_documents.feature.md new file mode 100644 index 0000000000000..57e51daf395cd --- /dev/null +++ b/changelog.d/elasticsearch_sink_skip_retrying_succeeded_documents.feature.md @@ -0,0 +1,3 @@ +The `request_retry_partial` behavior for the `elasticsearch` was changed. Now only the failed retriable requests in a bulk will be retried (instead of all requests in the bulk). + +authors: Serendo diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index e4f1ace676181..1e3b4b354c4e4 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -19,7 +19,7 @@ use crate::{ }, }; -#[derive(Serialize)] +#[derive(Serialize, Clone, Debug)] pub enum DocumentVersionType { External, ExternalGte, @@ -34,20 +34,20 @@ impl DocumentVersionType { } } -#[derive(Serialize)] +#[derive(Serialize, Clone, Debug)] pub struct DocumentVersion { pub kind: DocumentVersionType, pub value: u64, } -#[derive(Serialize)] +#[derive(Serialize, Clone, Debug)] pub enum DocumentMetadata { WithoutId, Id(String), IdAndVersion(String, DocumentVersion), } -#[derive(Serialize)] +#[derive(Serialize, Clone, Debug)] pub struct ProcessedEvent { pub index: String, pub bulk_action: BulkAction, diff --git a/src/sinks/elasticsearch/request_builder.rs b/src/sinks/elasticsearch/request_builder.rs index 9d80539a50931..5314c78527f48 100644 --- a/src/sinks/elasticsearch/request_builder.rs +++ b/src/sinks/elasticsearch/request_builder.rs @@ -26,6 +26,7 @@ pub struct Metadata { finalizers: EventFinalizers, batch_size: usize, events_byte_size: JsonSize, + original_events: Vec, } impl RequestBuilder> for ElasticsearchRequestBuilder { @@ -60,6 +61,7 @@ impl RequestBuilder> for ElasticsearchRequestBuilder { finalizers: events.take_finalizers(), batch_size: events.len(), events_byte_size, + original_events: events.clone(), }; (es_metadata, metadata_builder, events) } @@ -76,6 +78,8 @@ impl RequestBuilder> for ElasticsearchRequestBuilder { batch_size: es_metadata.batch_size, events_byte_size: es_metadata.events_byte_size, metadata, + original_events: es_metadata.original_events, + elasticsearch_request_builder: self.clone(), } } } diff --git a/src/sinks/elasticsearch/retry.rs b/src/sinks/elasticsearch/retry.rs index 7cf2e6c51f14a..284ecb60cc2f4 100644 --- a/src/sinks/elasticsearch/retry.rs +++ b/src/sinks/elasticsearch/retry.rs @@ -1,13 +1,19 @@ -use http::StatusCode; -use serde::Deserialize; - +use crate::sinks::{ + elasticsearch::encoder::ProcessedEvent, + util::{metadata::RequestMetadataBuilder, request_builder::RequestBuilder}, +}; use crate::{ + event::Finalizable, http::HttpError, sinks::{ - elasticsearch::service::ElasticsearchResponse, - util::retries::{RetryAction, RetryLogic}, + elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse}, + util::retries::{RetryAction, RetryLogic, RetryPartialFunction}, }, }; +use http::StatusCode; +use serde::Deserialize; +use vector_lib::json_size::JsonSize; +use vector_lib::EstimatedJsonEncodedSizeOf; #[derive(Deserialize, Debug)] struct EsResultResponse { @@ -86,6 +92,23 @@ pub struct ElasticsearchRetryLogic { pub retry_partial: bool, } +// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) } +struct EsRetryClosure { + closure: Box ElasticsearchRequest + Send + Sync>, +} + +impl RetryPartialFunction for EsRetryClosure { + fn modify_request(&self, request: Box) -> Box { + match request.downcast::() { + Ok(request) => { + let new_request = (self.closure)(*request); + Box::new(new_request) + } + Err(request) => request, + } + } +} + impl RetryLogic for ElasticsearchRetryLogic { type Error = HttpError; type Response = ElasticsearchResponse; @@ -124,21 +147,57 @@ impl RetryLogic for ElasticsearchRetryLogic { // We will retry if there exists at least one item that // failed with a retriable error. // Those are backpressure and server errors. - if let Some((status, error)) = + let status_codes: Vec = resp + .iter_status() + .map(|(status, _)| { + status == StatusCode::TOO_MANY_REQUESTS + || status.is_server_error() + }) + .collect(); + if let Some((_status, _error)) = resp.iter_status().find(|(status, _)| { *status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() }) { - let msg = if let Some(error) = error { - format!( - "partial error, status: {}, error type: {}, reason: {}", - status, error.err_type, error.reason - ) - } else { - format!("partial error, status: {status}") - }; - return RetryAction::Retry(msg.into()); + return RetryAction::RetryPartial(Box::new(EsRetryClosure { + closure: Box::new(move |req: ElasticsearchRequest| { + let mut failed_events: Vec = req + .original_events + .clone() + .into_iter() + .zip(status_codes.iter()) + .filter(|(_, &flag)| flag) + .map(|(item, _)| item) + .collect(); + let finalizers = failed_events.take_finalizers(); + let batch_size = failed_events.len(); + let events_byte_size = failed_events + .iter() + .map(|x| x.log.estimated_json_encoded_size_of()) + .fold(JsonSize::zero(), |a, b| a + b); + let encode_result = match req + .elasticsearch_request_builder + .encode_events(failed_events.clone()) + { + Ok(s) => s, + Err(_) => return req, + }; + let metadata_builder = + RequestMetadataBuilder::from_events(&failed_events); + let metadata = metadata_builder.build(&encode_result); + ElasticsearchRequest { + payload: encode_result.into_payload(), + finalizers, + batch_size, + events_byte_size, + metadata, + original_events: failed_events, + elasticsearch_request_builder: req + .elasticsearch_request_builder, + } + }), + })); } } @@ -201,7 +260,7 @@ mod tests { event_status: EventStatus::Errored, events_byte_size: CountByteSize(1, JsonSize::new(1)).into(), }), - RetryAction::Retry(_) + RetryAction::RetryPartial(_) )); } diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index b7be9dcda0ead..61066783335e2 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -19,10 +19,13 @@ use super::{ElasticsearchCommon, ElasticsearchConfig}; use crate::{ event::{EventFinalizers, EventStatus, Finalizable}, http::HttpClient, - sinks::util::{ - auth::Auth, - http::{HttpBatchService, RequestConfig}, - Compression, ElementCount, + sinks::{ + elasticsearch::{encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder}, + util::{ + auth::Auth, + http::{HttpBatchService, RequestConfig}, + Compression, ElementCount, + }, }, }; @@ -33,6 +36,8 @@ pub struct ElasticsearchRequest { pub batch_size: usize, pub events_byte_size: JsonSize, pub metadata: RequestMetadata, + pub original_events: Vec, //store original_events for reconstruct request when retrying + pub elasticsearch_request_builder: ElasticsearchRequestBuilder, } impl ByteSizeOf for ElasticsearchRequest { diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index 4198231ff4d36..e596268290c5d 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -17,12 +17,18 @@ use crate::Error; pub enum RetryAction { /// Indicate that this request should be retried with a reason Retry(Cow<'static, str>), + /// Indicate that a portion of this request should be retried with a generic function + RetryPartial(Box), /// Indicate that this request should not be retried with a reason DontRetry(Cow<'static, str>), /// Indicate that this request should not be retried but the request was successful Successful, } +pub trait RetryPartialFunction { + fn modify_request(&self, request: Box) -> Box; +} + pub trait RetryLogic: Clone + Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; type Response; @@ -141,7 +147,7 @@ where // NOTE: in the error cases- `Error` and `EventsDropped` internal events are emitted by the // driver, so only need to log here. - fn retry(&mut self, _: &mut Req, result: &mut Result) -> Option { + fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { match result { Ok(response) => match self.logic.should_retry_response(response) { RetryAction::Retry(reason) => { @@ -157,6 +163,34 @@ where Some(self.build_retry()) } + RetryAction::RetryPartial(rebuild_request_fn) => { + if self.remaining_attempts == 0 { + error!( + message = + "OK/retry response but retries exhausted; dropping the request.", + internal_log_rate_limit = true, + ); + return None; + } + let output = rebuild_request_fn.modify_request(Box::new(req.clone())); + if let Ok(output) = output.downcast::() { + *req = *output; + error!( + message = "OK/retrying partial after response.", + internal_log_rate_limit = true + ); + Some(self.build_retry()) + } else { + // unlikely to go here. + error!( + message = + "OK/retry response but invalid request; dropping the request.", + internal_log_rate_limit = true, + ); + None + } + } + RetryAction::DontRetry(reason) => { error!(message = "Not retriable; dropping the request.", reason = ?reason, internal_log_rate_limit = true); None @@ -220,7 +254,7 @@ impl Future for RetryPolicyFuture { impl RetryAction { pub const fn is_retryable(&self) -> bool { - matches!(self, RetryAction::Retry(_)) + matches!(self, RetryAction::Retry(_) | RetryAction::RetryPartial(_)) } pub const fn is_not_retryable(&self) -> bool {