feat(java): add non-blocking AsyncScanner with CompletableFuture API#6102
feat(java): add non-blocking AsyncScanner with CompletableFuture API#6102LuQQiu merged 13 commits intolance-format:mainfrom
Conversation
PR Review: feat(java): add non-blocking AsyncScanner with CompletableFuture APIP0: Race condition in
|
Review Comments AddressedThank you for the detailed review! I've addressed all three issues: ✅ P0: Fixed race condition in
|
77cd10f to
0d160ba
Compare
|
@hamersaw seems to be a performance improvement for presto/trino |
hamersaw
left a comment
There was a problem hiding this comment.
Thanks! This looks really good, mostly just questions on correct cleanup / logging in case of failures.
| // Attach ONCE and never detach - this is the key optimization | ||
| let mut env = jvm | ||
| .attach_current_thread_permanently() | ||
| .expect("Failed to attach dispatcher to JVM"); |
There was a problem hiding this comment.
We probably do not want to use expect anywhere in here because it will panic and crash the JVM. I think there are other places in the PR where we probably don't want to use expect as well.
There was a problem hiding this comment.
Good call, I will double check. But for the "expect" here, I just want the JVM crash to make it fail fast. Otherwise the issue will be very difficult to catch or debug later.
There was a problem hiding this comment.
Fixed in a06c692. Kept expect() here for fail-fast initialization (JVM should crash if dispatcher can't initialize), but replaced the runtime expect() at async_scanner.rs:91 with error logging to prevent crashes during normal operation.
|
|
||
| // Send result to dispatcher for Java completion | ||
| let dispatcher = DISPATCHER.get().expect("Dispatcher not initialized"); | ||
| let _ = dispatcher.send(DispatcherMessage { |
There was a problem hiding this comment.
Lets capture and at least log the error here.
There was a problem hiding this comment.
Fixed in a06c692. Added error logging for both dispatcher initialization check and send failures.
| let scanner_guard = | ||
| unsafe { env.get_rust_field::<_, _, AsyncScanner>(&j_scanner, NATIVE_ASYNC_SCANNER)? }; | ||
|
|
||
| scanner_guard.start_scan(task_id, scanner_global_ref); |
There was a problem hiding this comment.
| let scanner_guard = | |
| unsafe { env.get_rust_field::<_, _, AsyncScanner>(&j_scanner, NATIVE_ASYNC_SCANNER)? }; | |
| scanner_guard.start_scan(task_id, scanner_global_ref); | |
| // Clone the Arc<Scanner> and drop the MutexGuard before calling start_scan, | |
| // which does block_on internally. Holding the guard across block_on risks deadlock. | |
| let scanner = { | |
| let guard = | |
| unsafe { env.get_rust_field::<_, _, AsyncScanner>(&j_scanner, NATIVE_ASYNC_SCANNER)? }; | |
| guard.inner.clone() | |
| }; | |
| AsyncScanner::start_scan(scanner, task_id, scanner_global_ref); |
This way we ensure correct releasing of resources.
There was a problem hiding this comment.
Fixed in a06c692. Implemented exactly as suggested - clone the Arc<Scanner> and drop the guard before calling start_scan_with_scanner to prevent deadlock.
| }); | ||
|
|
||
| // Register task for cancellation support | ||
| RT.block_on(async { |
There was a problem hiding this comment.
Do we need some kind of two-phase tracking here? For example, before we spawn the task ^^ we register it, and then here we commit it. In the current implementation, however unlikely, if the I/O task completes before we reach this statement it can drop the _cleanup_guard and things will never be cleaned up from the TASK_TRACKER. A two-phase commit would ensure this is impossible.
There was a problem hiding this comment.
Excellent catch! Fixed in a06c692 with a three-step two-phase registration pattern:
- Pre-register with placeholder handle BEFORE spawning
- Spawn the actual task
- Update registration with real handle
This guarantees the task is tracked before the cleanup guard can run. Added TaskTracker::update_handle() method to atomically replace the placeholder with the real handle.
beinan
left a comment
There was a problem hiding this comment.
Fixed in a06c692:
- Kept
expect()in dispatcher initialization for fail-fast behavior (as discussed) - Replaced runtime
expect()at async_scanner.rs:91 with error logging to prevent JVM crashes during task completion - Added error logging for
dispatcher.send()failures instead of silently ignoring them - Fixed deadlock by cloning
Arc<Scanner>and dropping guard before callingstart_scan_with_scanner - Implemented two-phase registration pattern: pre-register with placeholder → spawn task → update with real handle
- Added
TaskTracker::update_handle()for atomic handle updates
All issues have been addressed. The dispatcher initialization still fails fast as intended, but runtime operations now handle errors gracefully.
Implements a parallel async scanner alongside the existing blocking LanceScanner to prevent thread starvation in Java query engines like Presto and Trino. ## Key Features - **Non-blocking I/O**: Spawns Tokio tasks instead of blocking Java threads - **CompletableFuture API**: Native Java async patterns for better integration - **Persistent JNI dispatcher**: Single thread attached to JVM for zero-overhead callbacks - **Task-based architecture**: Uses task IDs instead of JNI refs to prevent memory leaks - **Full feature parity**: Supports filters, projections, vector search, FTS, aggregates - **Clean cancellation**: Proper task cleanup without resource leaks ## Implementation ### Rust Components - **dispatcher.rs**: Persistent JNI thread with cached method IDs for callbacks - **task_tracker.rs**: Thread-safe task registry using RwLock<HashMap> - **async_scanner.rs**: AsyncScanner with Tokio task spawning and JNI exports - **lib.rs**: JNI_OnLoad hook to initialize global dispatcher on library load ### Java Components - **AsyncScanner.java**: CompletableFuture-based async API with task management - **AsyncScannerTest.java**: 6 comprehensive examples demonstrating usage patterns ## Architecture Uses the "Task ID + Dispatcher" pattern: 1. Java manages futures in ConcurrentHashMap<taskId, CompletableFuture> 2. Rust spawns async I/O tasks and returns immediately 3. Lock-free channel carries completion messages 4. Persistent dispatcher thread completes Java futures via JNI callbacks ## Testing ```bash ./mvnw test -Dtest=AsyncScannerTest ``` ## Compatibility - Parallel to existing LanceScanner (no breaking changes) - Same ScanOptions API for consistency - Opt-in: users choose blocking or async based on needs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add #[allow(clippy::too_many_arguments)] to inner_create_async_scanner - Remove needless borrows in dispatcher.rs call_method_unchecked calls
P0: Fix race condition in start_scan - Clone GlobalRef so spawned task has its own copy - Always send completion message even if task wasn't tracked - Prevents Java futures from hanging if task completes before registration P1: Remove code duplication - Extract build_full_text_search_query to pub(crate) in blocking_scanner - Import and reuse from async_scanner instead of duplicating ~130 lines - Ensures both scanners stay in sync when FTS query types are updated P1: Fix JNI signature mismatch - Remove unused 'handle' parameter from releaseNativeScanner in Java - Aligns Java signature with Rust implementation that only uses j_scanner Additional: - Add #[allow(dead_code)] to scanner_global_ref field (used for cleanup) - Suppress false positive warning about unused field
Improved code readability by extracting error and success handling into separate helper functions, reducing nesting depth in the dispatcher's event loop. Changes: - Extract handle_error() for error completion path - Extract handle_success() for success completion path - Use match expression instead of if-let for cleaner dispatch
Eliminated ~260 lines of duplicated scanner building logic between blocking_scanner.rs and async_scanner.rs by introducing a shared ScannerOptions struct and build_scanner_with_options function. Changes: - Add ScannerOptions struct to hold all JNI scanner parameters - Add build_scanner_with_options() as shared builder function - Refactor inner_create_scanner to use shared builder (~130 lines → ~45 lines) - Refactor inner_create_async_scanner to use shared builder (~130 lines → ~45 lines) - Remove unused imports from async_scanner.rs - Add explicit lifetimes to all JObject parameters for type safety Benefits: - Single source of truth for scanner building logic - Easier maintenance (fix bugs once, add features once) - No performance penalty (struct is stack-allocated and inlined) - Removes clippy "too many arguments" concerns from inner functions All tests passing with zero performance regression.
Fix formatting issues caught by CI: - Alphabetize imports (ScannerOptions before build_scanner_with_options) - Align parameter comments consistently - Format long function calls across multiple lines - Simplify jvalue struct initialization Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the "magic flag" pattern (error_msg: Option<String>) with idiomatic Rust Result<i64, String> for better type safety and clarity. Changes: - DispatcherMessage.result: Result<i64, String> instead of separate result_ptr and error_msg fields - Use Ok(ptr) for success case instead of (ptr, None) tuple - Use Err(msg) for error case instead of (-1, Some(msg)) tuple - Match on Result variants (Ok/Err) instead of Option (Some/None) Benefits: - Self-documenting: Result explicitly represents success/failure - Type-safe: Can't accidentally use result_ptr when there's an error - Idiomatic: Standard Rust pattern every developer understands - No magic numbers: Eliminates the -1 sentinel value All tests passing (AsyncScannerTest: 6/6). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement TaskCleanupGuard using RAII to guarantee task removal from HashMap, preventing memory leaks in two scenarios: 1. Race condition where task completes before registration 2. Task panic before manual cleanup Changes: - Add TaskCleanupGuard struct with Drop trait implementation - Guard automatically removes task_id from tracker when dropped - Works on normal completion, panic, or cancellation - Use RT.spawn() in Drop instead of block_on() to avoid runtime nesting Benefits: - Compiler-guaranteed cleanup - impossible to forget - Exception-safe - works even when task panics - Zero runtime cost - compiler inlines the guard TODO: Add timeout-based cleanup for additional defense-in-depth (see detailed proposal in task_tracker.rs) All tests passing (AsyncScannerTest: 6/6). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
AsyncScanner was missing the prefilter parameter that was added to LanceScanner in upstream. This caused AsyncScanner to ignore the prefilter setting from ScanOptions. Changes: - Add options.isPrefilter() call in createAsyncScanner invocation - Add prefilter boolean parameter to native method signature This brings AsyncScanner in sync with LanceScanner and ensures all scanner options are properly forwarded. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace runtime expect() with error logging to prevent JVM crashes - Log dispatcher send errors instead of silently ignoring - Fix potential deadlock by cloning Arc<Scanner> before start_scan - Implement two-phase registration to prevent race condition - Add TaskTracker::update_handle() for atomic handle updates Reviewer: hamersaw
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
dc25fef to
4cebdbf
Compare
|
Hi @hamersaw, could you help take a look again? Thanks! |
|
@hamersaw Circling back on this—since tomorrow is my last day at Uber, I’m aiming to wrap up all outstanding PRs today. Do you have time for a final look so we can get this merged? |
| result, | ||
| }) { | ||
| log::error!( | ||
| "Failed to send completion message for task {}: {}", |
There was a problem hiding this comment.
should we clean up the result ptr if any error happens during send?
There was a problem hiding this comment.
Good catch! Added cleanup for the FFI stream pointer in both error paths — when the dispatcher is not initialized and when send() fails. Also save the pointer before sending so we can reclaim it without cloning the result. (474a70c)
|
|
||
| match msg.result { | ||
| Err(error) => { | ||
| handle_error(&mut env, scanner_obj, fail_method, msg.task_id, &error) |
There was a problem hiding this comment.
what would happen if we have error when handling error? is there a way like finally to cleanup the things
There was a problem hiding this comment.
Great question. Added env.exception_clear() after every failed JNI call (new_string, failTask, completeTask) to clear any pending exception and protect the dispatcher loop from corruption. For handle_success, we also free the FFI stream pointer since Java will never receive it. This effectively acts as a finally block for each message dispatch. (474a70c)
Address reviewer feedback on resource cleanup: - async_scanner: free FFI stream pointer when dispatcher is missing or send fails, preventing memory leaks - dispatcher: clear pending JNI exceptions after failed JNI calls to protect the dispatcher loop from corruption - dispatcher: free FFI stream pointer when completeTask call fails, since Java will never receive it Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Code reviewFound 2 issues:
lance/java/src/main/java/org/lance/ipc/AsyncScanner.java Lines 149 to 163 in 474a70c lance/java/src/main/java/org/lance/ipc/AsyncScanner.java Lines 191 to 200 in 474a70c lance/java/lance-jni/src/dispatcher.rs Lines 142 to 157 in 474a70c
lance/java/src/test/java/org/lance/AsyncScannerTest.java Lines 157 to 178 in 474a70c 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
hamersaw
left a comment
There was a problem hiding this comment.
Appreciate the effort on getting this right! It's going to be a big improvement I think.
…ance-format#6102) ## Summary Implements a parallel async scanner alongside the existing blocking `LanceScanner` to prevent thread starvation in Java query engines like Presto and Trino. This PR adds **AsyncScanner** - a non-blocking alternative to `LanceScanner` that uses `CompletableFuture` for true async I/O operations. ## Motivation Query engines like Presto and Trino rely on non-blocking I/O to efficiently multiplex thousands of concurrent queries on a limited thread pool. The current `LanceScanner` blocks Java threads during Rust I/O operations, causing thread starvation and poor performance in these environments. ## Key Features ✅ **Non-blocking I/O**: Spawns Tokio tasks instead of blocking Java threads ✅ **CompletableFuture API**: Native Java async patterns for seamless integration ✅ **Persistent JNI dispatcher**: Single thread attached to JVM for zero-overhead callbacks ✅ **Task-based architecture**: Uses task IDs instead of JNI refs to prevent memory leaks ✅ **Full feature parity**: Supports filters, projections, vector search, FTS, aggregates ✅ **Clean cancellation**: Proper task cleanup without resource leaks ✅ **Parallel to existing API**: No breaking changes, `LanceScanner` unchanged ## Architecture The implementation uses the **"Task ID + Dispatcher" pattern**: 1. **Java manages futures**: `ConcurrentHashMap<taskId, CompletableFuture<Long>>` maps task IDs to pending requests 2. **Rust spawns async tasks**: Returns immediately while Tokio handles I/O in background 3. **Lock-free completion channel**: Carries `(taskId, resultPointer)` from Tokio to dispatcher 4. **Persistent dispatcher thread**: Attaches to JVM once, completes Java futures via cached JNI method IDs ### Components **Rust (`java/lance-jni/src/`):** - `dispatcher.rs` - Persistent JNI thread with cached method IDs for callbacks - `task_tracker.rs` - Thread-safe task registry using `RwLock<HashMap>` - `async_scanner.rs` - AsyncScanner with Tokio task spawning and JNI exports - `lib.rs` - Modified to add `JNI_OnLoad` hook for dispatcher initialization **Java (`java/src/main/java/org/lance/ipc/`):** - `AsyncScanner.java` - CompletableFuture-based async API with task management **Tests (`java/src/test/java/org/lance/`):** - `AsyncScannerTest.java` - 6 comprehensive examples demonstrating usage patterns ## Usage Examples ### Basic async scan ```java ScanOptions options = new ScanOptions.Builder().batchSize(20L).build(); try (AsyncScanner scanner = AsyncScanner.create(dataset, options, allocator)) { CompletableFuture<ArrowReader> future = scanner.scanBatchesAsync(); ArrowReader reader = future.get(10, TimeUnit.SECONDS); while (reader.loadNextBatch()) { // Process batches without blocking } } ``` ### Multiple concurrent scans (key benefit for Presto/Trino) ```java List<CompletableFuture<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { AsyncScanner scanner = AsyncScanner.create(dataset, options, allocator); futures.add(scanner.scanBatchesAsync() .thenApply(reader -> processInBackground(reader))); } // All scans run in parallel without blocking threads! CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .get(30, TimeUnit.SECONDS); ``` ## Testing ```bash cd java ./mvnw test -Dtest=AsyncScannerTest ./mvnw compile # Full build verification ``` All tests pass ✅ ## Compatibility - ✅ No breaking changes to existing APIs - ✅ `LanceScanner` remains unchanged - ✅ Uses same `ScanOptions` for consistency - ✅ Opt-in: users choose blocking or async based on their needs ## Performance Benefits For query engines running hundreds of concurrent queries: - **Before**: Thread pool exhaustion as threads block on I/O - **After**: Threads immediately return, I/O happens in background ## Checklist - [x] Rust code compiles (`cargo check`) - [x] Java code compiles (`./mvnw compile`) - [x] Code formatted (`./mvnw spotless:apply && cargo fmt`) - [x] Comprehensive tests added (`AsyncScannerTest.java`) - [x] Documentation in code examples - [x] No breaking changes ## Related Issues Addresses the need for non-blocking I/O in Java query engines that integrate with LanceDB. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Implements a parallel async scanner alongside the existing blocking
LanceScannerto prevent thread starvation in Java query engines like Presto and Trino.This PR adds AsyncScanner - a non-blocking alternative to
LanceScannerthat usesCompletableFuturefor true async I/O operations.Motivation
Query engines like Presto and Trino rely on non-blocking I/O to efficiently multiplex thousands of concurrent queries on a limited thread pool. The current
LanceScannerblocks Java threads during Rust I/O operations, causing thread starvation and poor performance in these environments.Key Features
✅ Non-blocking I/O: Spawns Tokio tasks instead of blocking Java threads
✅ CompletableFuture API: Native Java async patterns for seamless integration
✅ Persistent JNI dispatcher: Single thread attached to JVM for zero-overhead callbacks
✅ Task-based architecture: Uses task IDs instead of JNI refs to prevent memory leaks
✅ Full feature parity: Supports filters, projections, vector search, FTS, aggregates
✅ Clean cancellation: Proper task cleanup without resource leaks
✅ Parallel to existing API: No breaking changes,
LanceScannerunchangedArchitecture
The implementation uses the "Task ID + Dispatcher" pattern:
ConcurrentHashMap<taskId, CompletableFuture<Long>>maps task IDs to pending requests(taskId, resultPointer)from Tokio to dispatcherComponents
Rust (
java/lance-jni/src/):dispatcher.rs- Persistent JNI thread with cached method IDs for callbackstask_tracker.rs- Thread-safe task registry usingRwLock<HashMap>async_scanner.rs- AsyncScanner with Tokio task spawning and JNI exportslib.rs- Modified to addJNI_OnLoadhook for dispatcher initializationJava (
java/src/main/java/org/lance/ipc/):AsyncScanner.java- CompletableFuture-based async API with task managementTests (
java/src/test/java/org/lance/):AsyncScannerTest.java- 6 comprehensive examples demonstrating usage patternsUsage Examples
Basic async scan
Multiple concurrent scans (key benefit for Presto/Trino)
Testing
All tests pass ✅
Compatibility
LanceScannerremains unchangedScanOptionsfor consistencyPerformance Benefits
For query engines running hundreds of concurrent queries:
Checklist
cargo check)./mvnw compile)./mvnw spotless:apply && cargo fmt)AsyncScannerTest.java)Related Issues
Addresses the need for non-blocking I/O in Java query engines that integrate with LanceDB.
🤖 Generated with Claude Code