Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
287571c
Update to tower 0.5.
Serendo Feb 13, 2025
7829287
Add a RetryPartial variant to RetryAction. which is a closure and imp…
Serendo Feb 13, 2025
7561898
add a changelog fragment for elasticsearch sink retry partial
Serendo Feb 14, 2025
e02e6ad
changed some threshold of the tests. But I dont quite understand how …
Serendo Feb 14, 2025
454d3a2
add newline to changelog end
Serendo Feb 15, 2025
20f2829
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
pront Mar 11, 2025
6edf77f
cargo.lock update
pront Mar 11, 2025
3b241d1
Update changelog.d/elasticsearch_sink_skip_retrying_succeeded_documen…
pront Mar 25, 2025
a6badfc
Merge branch 'vectordotdev:master' into es-retry-tower0.5
Serendo Mar 27, 2025
4a7ab30
remove unwrap for safety.
Serendo Apr 15, 2025
c3f317a
cleanup the code.
Serendo Apr 15, 2025
0034f56
Merge branch 'master' into es-retry-tower0.5
Serendo Apr 16, 2025
19cd25b
Put original events and es request builder into the ElasticsearchReq…
Serendo Apr 16, 2025
6c98e8e
merge master changes
Serendo Jun 25, 2025
0418ace
fix merge conflict
Serendo Jun 25, 2025
09530fa
Merge branch 'master' into es-retry-tower0.5
pront Jun 25, 2025
ae3d727
Update src/sinks/util/retries.rs
Serendo Jul 16, 2025
c5dfe91
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 16, 2025
fb5b597
Update src/sinks/elasticsearch/retry.rs
Serendo Jul 30, 2025
3b1cac7
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
Serendo Jul 31, 2025
522feb6
as pront suggested
Serendo Jul 31, 2025
b6edcf7
Merge branch 'master' into es-retry-tower0.5
Serendo Jul 31, 2025
e972da4
Remove downcast ref and trait usage from partial retry logic
thomasqueirozb Aug 6, 2025
79e7dd3
Fix make check-clippy errors
thomasqueirozb Aug 6, 2025
abeb520
Revert "Fix make check-clippy errors"
thomasqueirozb Aug 6, 2025
3bd9c0d
Revert "Remove downcast ref and trait usage from partial retry logic"
thomasqueirozb Aug 6, 2025
5eb44e1
Merge remote-tracking branch 'origin/master' into es-retry-tower0.5
thomasqueirozb Aug 6, 2025
1b64e9e
Revert cmake bump in Cargo.lock
thomasqueirozb Aug 6, 2025
ce35e6d
cargo fmt
thomasqueirozb Aug 6, 2025
2b401f4
Address PR feedback
thomasqueirozb Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `request_retry_partial` behavior for the `elasticsearch` was changed. Now only the failed retriable requests in a bulk will be retried (instead of all requests in the bulk).

authors: Serendo
8 changes: 4 additions & 4 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
};

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub enum DocumentVersionType {
External,
ExternalGte,
Expand All @@ -34,20 +34,20 @@ impl DocumentVersionType {
}
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub struct DocumentVersion {
pub kind: DocumentVersionType,
pub value: u64,
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub enum DocumentMetadata {
WithoutId,
Id(String),
IdAndVersion(String, DocumentVersion),
}

#[derive(Serialize)]
#[derive(Serialize, Clone, Debug)]
pub struct ProcessedEvent {
pub index: String,
pub bulk_action: BulkAction,
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/elasticsearch/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Metadata {
finalizers: EventFinalizers,
batch_size: usize,
events_byte_size: JsonSize,
original_events: Vec<ProcessedEvent>,
}

impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
finalizers: events.take_finalizers(),
batch_size: events.len(),
events_byte_size,
original_events: events.clone(),
};
(es_metadata, metadata_builder, events)
}
Expand All @@ -76,6 +78,8 @@ impl RequestBuilder<Vec<ProcessedEvent>> for ElasticsearchRequestBuilder {
batch_size: es_metadata.batch_size,
events_byte_size: es_metadata.events_byte_size,
metadata,
original_events: es_metadata.original_events,
elasticsearch_request_builder: self.clone(),
}
}
}
91 changes: 75 additions & 16 deletions src/sinks/elasticsearch/retry.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use http::StatusCode;
use serde::Deserialize;

use crate::sinks::{
elasticsearch::encoder::ProcessedEvent,
util::{metadata::RequestMetadataBuilder, request_builder::RequestBuilder},
};
use crate::{
event::Finalizable,
http::HttpError,
sinks::{
elasticsearch::service::ElasticsearchResponse,
util::retries::{RetryAction, RetryLogic},
elasticsearch::service::{ElasticsearchRequest, ElasticsearchResponse},
util::retries::{RetryAction, RetryLogic, RetryPartialFunction},
},
};
use http::StatusCode;
use serde::Deserialize;
use vector_lib::json_size::JsonSize;
use vector_lib::EstimatedJsonEncodedSizeOf;

#[derive(Deserialize, Debug)]
struct EsResultResponse {
Expand Down Expand Up @@ -86,6 +92,23 @@ pub struct ElasticsearchRetryLogic {
pub retry_partial: bool,
}

// construct a closure by EsRetryClosure { closure: Box::new(|req: ElasticsearchRequest| { new_req }) }
struct EsRetryClosure {
closure: Box<dyn Fn(ElasticsearchRequest) -> ElasticsearchRequest + Send + Sync>,
}

impl RetryPartialFunction for EsRetryClosure {
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any> {
match request.downcast::<ElasticsearchRequest>() {
Ok(request) => {
let new_request = (self.closure)(*request);
Box::new(new_request)
}
Err(request) => request,
}
}
}

impl RetryLogic for ElasticsearchRetryLogic {
type Error = HttpError;
type Response = ElasticsearchResponse;
Expand Down Expand Up @@ -124,21 +147,57 @@ impl RetryLogic for ElasticsearchRetryLogic {
// We will retry if there exists at least one item that
// failed with a retriable error.
// Those are backpressure and server errors.
if let Some((status, error)) =
let status_codes: Vec<bool> = resp
.iter_status()
.map(|(status, _)| {
status == StatusCode::TOO_MANY_REQUESTS
|| status.is_server_error()
})
.collect();
if let Some((_status, _error)) =
resp.iter_status().find(|(status, _)| {
*status == StatusCode::TOO_MANY_REQUESTS
|| status.is_server_error()
})
{
let msg = if let Some(error) = error {
format!(
"partial error, status: {}, error type: {}, reason: {}",
status, error.err_type, error.reason
)
} else {
format!("partial error, status: {status}")
};
return RetryAction::Retry(msg.into());
return RetryAction::RetryPartial(Box::new(EsRetryClosure {
closure: Box::new(move |req: ElasticsearchRequest| {
let mut failed_events: Vec<ProcessedEvent> = req
.original_events
.clone()
.into_iter()
.zip(status_codes.iter())
.filter(|(_, &flag)| flag)
.map(|(item, _)| item)
.collect();
let finalizers = failed_events.take_finalizers();
let batch_size = failed_events.len();
let events_byte_size = failed_events
.iter()
.map(|x| x.log.estimated_json_encoded_size_of())
.fold(JsonSize::zero(), |a, b| a + b);
let encode_result = match req
.elasticsearch_request_builder
.encode_events(failed_events.clone())
{
Ok(s) => s,
Err(_) => return req,
};
let metadata_builder =
RequestMetadataBuilder::from_events(&failed_events);
let metadata = metadata_builder.build(&encode_result);
ElasticsearchRequest {
payload: encode_result.into_payload(),
finalizers,
batch_size,
events_byte_size,
metadata,
original_events: failed_events,
elasticsearch_request_builder: req
.elasticsearch_request_builder,
}
}),
}));
}
}

Expand Down Expand Up @@ -201,7 +260,7 @@ mod tests {
event_status: EventStatus::Errored,
events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
}),
RetryAction::Retry(_)
RetryAction::RetryPartial(_)
));
}

Expand Down
13 changes: 9 additions & 4 deletions src/sinks/elasticsearch/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ use super::{ElasticsearchCommon, ElasticsearchConfig};
use crate::{
event::{EventFinalizers, EventStatus, Finalizable},
http::HttpClient,
sinks::util::{
auth::Auth,
http::{HttpBatchService, RequestConfig},
Compression, ElementCount,
sinks::{
elasticsearch::{encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder},
util::{
auth::Auth,
http::{HttpBatchService, RequestConfig},
Compression, ElementCount,
},
},
};

Expand All @@ -33,6 +36,8 @@ pub struct ElasticsearchRequest {
pub batch_size: usize,
pub events_byte_size: JsonSize,
pub metadata: RequestMetadata,
pub original_events: Vec<ProcessedEvent>, //store original_events for reconstruct request when retrying
pub elasticsearch_request_builder: ElasticsearchRequestBuilder,
}

impl ByteSizeOf for ElasticsearchRequest {
Expand Down
38 changes: 36 additions & 2 deletions src/sinks/util/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ use crate::Error;
pub enum RetryAction {
/// Indicate that this request should be retried with a reason
Retry(Cow<'static, str>),
/// Indicate that a portion of this request should be retried with a generic function
RetryPartial(Box<dyn RetryPartialFunction>),
/// Indicate that this request should not be retried with a reason
DontRetry(Cow<'static, str>),
/// Indicate that this request should not be retried but the request was successful
Successful,
}

pub trait RetryPartialFunction {
fn modify_request(&self, request: Box<dyn std::any::Any>) -> Box<dyn std::any::Any>;
}

pub trait RetryLogic: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Response;
Expand Down Expand Up @@ -141,7 +147,7 @@ where

// NOTE: in the error cases- `Error` and `EventsDropped` internal events are emitted by the
// driver, so only need to log here.
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
match result {
Ok(response) => match self.logic.should_retry_response(response) {
RetryAction::Retry(reason) => {
Expand All @@ -157,6 +163,34 @@ where
Some(self.build_retry())
}

RetryAction::RetryPartial(rebuild_request_fn) => {
if self.remaining_attempts == 0 {
error!(
message =
"OK/retry response but retries exhausted; dropping the request.",
internal_log_rate_limit = true,
);
return None;
}
let output = rebuild_request_fn.modify_request(Box::new(req.clone()));
if let Ok(output) = output.downcast::<Req>() {
*req = *output;
error!(
message = "OK/retrying partial after response.",
internal_log_rate_limit = true
);
Some(self.build_retry())
} else {
// unlikely to go here.
error!(
message =
"OK/retry response but invalid request; dropping the request.",
internal_log_rate_limit = true,
);
None
}
}

RetryAction::DontRetry(reason) => {
error!(message = "Not retriable; dropping the request.", reason = ?reason, internal_log_rate_limit = true);
None
Expand Down Expand Up @@ -220,7 +254,7 @@ impl Future for RetryPolicyFuture {

impl RetryAction {
pub const fn is_retryable(&self) -> bool {
matches!(self, RetryAction::Retry(_))
matches!(self, RetryAction::Retry(_) | RetryAction::RetryPartial(_))
}

pub const fn is_not_retryable(&self) -> bool {
Expand Down
Loading