Skip to content

Commit 45cd513

Browse files
committed
Changes in the lambda offloading
Lambda now details (but does not retry) which split failed or was successful. Leaf cache lambda individual split result, (keep track of the rewritten) request to do so). Lambda handler returned split-id named results in any order.
1 parent 794af6d commit 45cd513

File tree

9 files changed

+180
-76
lines changed

9 files changed

+180
-76
lines changed

quickwit/quickwit-lambda-client/src/invoker.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use aws_sdk_lambda::primitives::Blob;
2121
use aws_sdk_lambda::types::InvocationType;
2222
use base64::prelude::*;
2323
use prost::Message;
24-
use quickwit_lambda_server::{LeafSearchRequestPayload, LeafSearchResponsePayload};
25-
use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse, LeafSearchResponses};
24+
use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload};
25+
use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest};
2626
use quickwit_search::{LambdaLeafSearchInvoker, SearchError};
2727
use tracing::{debug, info, instrument};
2828

@@ -85,7 +85,7 @@ impl LambdaLeafSearchInvoker for AwsLambdaInvoker {
8585
async fn invoke_leaf_search(
8686
&self,
8787
request: LeafSearchRequest,
88-
) -> Result<Vec<LeafSearchResponse>, SearchError> {
88+
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
8989
let start = std::time::Instant::now();
9090

9191
let result = self.invoke_leaf_search_inner(request).await;
@@ -109,10 +109,10 @@ impl AwsLambdaInvoker {
109109
async fn invoke_leaf_search_inner(
110110
&self,
111111
request: LeafSearchRequest,
112-
) -> Result<Vec<LeafSearchResponse>, SearchError> {
112+
) -> Result<Vec<LambdaSingleSplitResult>, SearchError> {
113113
// Serialize request to protobuf bytes, then base64 encode
114114
let request_bytes = request.encode_to_vec();
115-
let payload = LeafSearchRequestPayload {
115+
let payload = LambdaSearchRequestPayload {
116116
payload: BASE64_STANDARD.encode(&request_bytes),
117117
};
118118

@@ -164,22 +164,22 @@ impl AwsLambdaInvoker {
164164
.leaf_search_response_payload_size_bytes
165165
.observe(response_payload.as_ref().len() as f64);
166166

167-
let lambda_response: LeafSearchResponsePayload =
167+
let lambda_response: LambdaSearchResponsePayload =
168168
serde_json::from_slice(response_payload.as_ref())
169169
.map_err(|e| SearchError::Internal(format!("json deserialization error: {}", e)))?;
170170

171171
let response_bytes = BASE64_STANDARD
172172
.decode(&lambda_response.payload)
173173
.map_err(|e| SearchError::Internal(format!("base64 decode error: {}", e)))?;
174174

175-
let leaf_responses = LeafSearchResponses::decode(&response_bytes[..])
175+
let leaf_responses = LambdaSearchResponses::decode(&response_bytes[..])
176176
.map_err(|e| SearchError::Internal(format!("protobuf decode error: {}", e)))?;
177177

178178
debug!(
179-
num_responses = leaf_responses.responses.len(),
179+
num_results = leaf_responses.split_results.len(),
180180
"lambda invocation completed"
181181
);
182182

183-
Ok(leaf_responses.responses)
183+
Ok(leaf_responses.split_results)
184184
}
185185
}

quickwit/quickwit-lambda-client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ mod metrics;
3434
pub use deploy::try_get_or_deploy_invoker;
3535
pub use metrics::LAMBDA_METRICS;
3636
// Re-export payload types from server crate for convenience
37-
pub use quickwit_lambda_server::{LeafSearchRequestPayload, LeafSearchResponsePayload};
37+
pub use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload};

quickwit/quickwit-lambda-server/src/bin/leaf_search.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
use std::sync::Arc;
1818

1919
use lambda_runtime::{Error, LambdaEvent, service_fn};
20-
use quickwit_lambda_server::{LambdaSearcherContext, LeafSearchRequestPayload, handle_leaf_search};
20+
use quickwit_lambda_server::{
21+
LambdaSearchRequestPayload, LambdaSearcherContext, handle_leaf_search,
22+
};
2123
use tracing::info;
2224
use tracing_subscriber::EnvFilter;
2325

@@ -36,7 +38,7 @@ async fn main() -> Result<(), Error> {
3638

3739
// Run the Lambda handler
3840
lambda_runtime::run(service_fn(
39-
|event: LambdaEvent<LeafSearchRequestPayload>| {
41+
|event: LambdaEvent<LambdaSearchRequestPayload>| {
4042
let ctx = Arc::clone(&context);
4143
async move {
4244
let (payload, _event_ctx) = event.into_parts();

quickwit/quickwit-lambda-server/src/handler.rs

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414

1515
use base64::prelude::*;
1616
use prost::Message;
17+
use quickwit_proto::search::lambda_single_split_result::Outcome;
1718
use quickwit_proto::search::{
18-
LeafSearchRequest, LeafSearchResponse, LeafSearchResponses, SplitIdAndFooterOffsets,
19+
LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest, SplitIdAndFooterOffsets,
1920
};
2021
use quickwit_search::leaf::multi_index_leaf_search;
2122
use serde::{Deserialize, Serialize};
@@ -26,27 +27,28 @@ use crate::error::{LambdaError, LambdaResult};
2627

2728
/// Payload for leaf search Lambda invocation.
2829
#[derive(Debug, Serialize, Deserialize)]
29-
pub struct LeafSearchRequestPayload {
30+
pub struct LambdaSearchRequestPayload {
3031
/// Base64-encoded serialized LeafSearchRequest protobuf.
3132
pub payload: String,
3233
}
3334

3435
/// Response from leaf search Lambda invocation.
3536
#[derive(Debug, Serialize, Deserialize)]
36-
pub struct LeafSearchResponsePayload {
37-
/// Base64-encoded serialized `LeafSearchResponses` protobuf (one per split).
37+
pub struct LambdaSearchResponsePayload {
38+
/// Base64-encoded serialized `LambdaSearchResponses` protobuf (one per split).
3839
pub payload: String,
3940
}
4041

4142
/// Handle a leaf search request in Lambda.
4243
///
43-
/// Returns one `LeafSearchResponse` per split. Each split is processed
44-
/// independently so that the caller can cache and merge results individually.
44+
/// Returns one `LambdaSingleSplitResult` per split, each tagged with its
45+
/// split_id. Individual split failures are reported per-split rather than
46+
/// failing the entire invocation, so the caller can retry only failed splits.
4547
#[instrument(skip(ctx), fields(request_id))]
4648
pub async fn handle_leaf_search(
47-
event: LeafSearchRequestPayload,
49+
event: LambdaSearchRequestPayload,
4850
ctx: &LambdaSearcherContext,
49-
) -> LambdaResult<LeafSearchResponsePayload> {
51+
) -> LambdaResult<LambdaSearchResponsePayload> {
5052
// Decode base64 payload
5153
let request_bytes = BASE64_STANDARD
5254
.decode(&event.payload)
@@ -70,11 +72,12 @@ pub async fn handle_leaf_search(
7072
let num_splits = all_splits.len();
7173
info!(num_splits, "processing leaf search request (per-split)");
7274

73-
// Process each split in parallel. The SearchPermitProvider inside
74-
// SearcherContext gates concurrency based on memory budget.
75-
let mut split_search_futures: Vec<tokio::task::JoinHandle<_>> =
76-
Vec::with_capacity(all_splits.len());
75+
// Process each split in parallel using a JoinSet. The SearchPermitProvider
76+
// inside SearcherContext gates concurrency based on memory budget.
77+
let mut split_search_joinset: tokio::task::JoinSet<(String, Result<_, String>)> =
78+
tokio::task::JoinSet::new();
7779
for (leaf_req_idx, split) in all_splits {
80+
let split_id = split.split_id.clone();
7881
let leaf_request_ref = &leaf_search_request.leaf_requests[leaf_req_idx];
7982
let single_split_request = LeafSearchRequest {
8083
search_request: leaf_search_request.search_request.clone(),
@@ -89,43 +92,57 @@ pub async fn handle_leaf_search(
8992

9093
let searcher_context = ctx.searcher_context.clone();
9194
let storage_resolver = ctx.storage_resolver.clone();
92-
split_search_futures.push(tokio::task::spawn(multi_index_leaf_search(
93-
searcher_context,
94-
single_split_request,
95-
storage_resolver,
96-
)));
95+
split_search_joinset.spawn(async move {
96+
let result =
97+
multi_index_leaf_search(searcher_context, single_split_request, storage_resolver)
98+
.await
99+
.map_err(|err| format!("{err}"));
100+
(split_id, result)
101+
});
97102
}
98103

99-
// Collect results, preserving split order.
100-
let mut responses: Vec<LeafSearchResponse> = Vec::with_capacity(num_splits);
101-
for split_search_fut in split_search_futures {
102-
match split_search_fut.await {
103-
Ok(Ok(response)) => responses.push(response),
104-
Ok(Err(e)) => {
105-
return Err(LambdaError::Internal(format!("leaf search failed: {e}")));
104+
// Collect results. Order is irrelevant: each result is tagged with its split_id.
105+
let mut split_results: Vec<LambdaSingleSplitResult> = Vec::with_capacity(num_splits);
106+
let mut num_successes: usize = 0;
107+
let mut num_failures: usize = 0;
108+
while let Some(join_result) = split_search_joinset.join_next().await {
109+
match join_result {
110+
Ok((split_id, Ok(response))) => {
111+
num_successes += 1;
112+
split_results.push(LambdaSingleSplitResult {
113+
split_id,
114+
outcome: Some(Outcome::Response(response)),
115+
});
116+
}
117+
Ok((split_id, Err(error_msg))) => {
118+
num_failures += 1;
119+
warn!(split_id = %split_id, error = %error_msg, "split search failed");
120+
split_results.push(LambdaSingleSplitResult {
121+
split_id,
122+
outcome: Some(Outcome::Error(error_msg)),
123+
});
106124
}
107125
Err(join_error) if join_error.is_cancelled() => {
108126
warn!("search task was cancelled");
109127
return Err(LambdaError::Cancelled);
110128
}
111129
Err(join_error) => {
130+
// Panics lose the captured split_id, so we fail the entire invocation.
112131
error!(error = %join_error, "search task panicked");
113132
return Err(LambdaError::Internal(format!(
114133
"search task panicked: {join_error}"
115134
)));
116135
}
117136
}
118137
}
119-
120138
info!(
121-
num_responses = responses.len(),
122-
"leaf search completed (per-split)"
139+
num_successes,
140+
num_failures, "leaf search completed (per-split)"
123141
);
124142

125-
// Serialize as LeafSearchResponses wrapper
126-
let wrapper = LeafSearchResponses { responses };
143+
let wrapper = LambdaSearchResponses { split_results };
127144
let response_bytes = wrapper.encode_to_vec();
128145
let payload = BASE64_STANDARD.encode(&response_bytes);
129146

130-
Ok(LeafSearchResponsePayload { payload })
147+
Ok(LambdaSearchResponsePayload { payload })
131148
}

quickwit/quickwit-lambda-server/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ mod handler;
2323

2424
pub use context::LambdaSearcherContext;
2525
pub use error::{LambdaError, LambdaResult};
26-
pub use handler::{LeafSearchRequestPayload, LeafSearchResponsePayload, handle_leaf_search};
26+
pub use handler::{LambdaSearchRequestPayload, LambdaSearchResponsePayload, handle_leaf_search};

quickwit/quickwit-proto/protos/quickwit/search.proto

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,7 @@ message SplitSearchError {
333333
bool retryable_error = 3;
334334
}
335335

336-
/// A LeafSearchRequest can span multiple indices.
337-
///
336+
// A LeafSearchRequest can span multiple indices.
338337
message LeafSearchRequest {
339338
// Search request. This is a perfect copy of the original search request
340339
// that was sent to root apart from the start_offset, max_hits params and index_id_patterns.
@@ -361,7 +360,7 @@ message ResourceStats {
361360
uint64 cpu_microsecs = 5;
362361
}
363362

364-
/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
363+
// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
365364
message LeafRequestRef {
366365
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
367366
uint32 doc_mapper_ord = 1;
@@ -497,10 +496,23 @@ message LeafSearchResponse {
497496
ResourceStats resource_stats = 8;
498497
}
499498

500-
// Wrapper for multiple LeafSearchResponse messages, used by Lambda to return
501-
// per-split results.
502-
message LeafSearchResponses {
503-
repeated LeafSearchResponse responses = 1;
499+
// The result of searching a single split in a Lambda invocation.
500+
// Each result is tagged with its split_id so that ordering is irrelevant.
501+
message LambdaSingleSplitResult {
502+
// The split that was searched.
503+
string split_id = 1;
504+
oneof outcome {
505+
// On success, the leaf search response for this split.
506+
LeafSearchResponse response = 2;
507+
// On failure, the error message.
508+
string error = 3;
509+
}
510+
}
511+
512+
// Wrapper for per-split results from a Lambda invocation.
513+
message LambdaSearchResponses {
514+
reserved 1; // was: repeated LeafSearchResponse responses
515+
repeated LambdaSingleSplitResult split_results = 2;
504516
}
505517

506518
message SnippetRequest {

quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Lines changed: 31 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-search/src/invoker.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//! Trait for invoking remote serverless functions for leaf search.
1616
1717
use async_trait::async_trait;
18-
use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse};
18+
use quickwit_proto::search::{LambdaSingleSplitResult, LeafSearchRequest};
1919

2020
use crate::SearchError;
2121

@@ -27,9 +27,12 @@ use crate::SearchError;
2727
pub trait LambdaLeafSearchInvoker: Send + Sync + 'static {
2828
/// Invoke the remote function with a LeafSearchRequest.
2929
///
30-
/// Returns one `LeafSearchResponse` per split in the request.
30+
/// Returns one `LambdaSingleSplitResult` per split in the request.
31+
/// Each result is tagged with its split_id so ordering is irrelevant.
32+
/// Individual split failures are reported per-split; the outer `Result`
33+
/// only represents transport-level errors.
3134
async fn invoke_leaf_search(
3235
&self,
3336
request: LeafSearchRequest,
34-
) -> Result<Vec<LeafSearchResponse>, SearchError>;
37+
) -> Result<Vec<LambdaSingleSplitResult>, SearchError>;
3538
}

0 commit comments

Comments
 (0)