feat(compaction): binary copy capability for compaction#5434
feat(compaction): binary copy capability for compaction#5434westonpace merged 29 commits intolance-format:mainfrom
Conversation
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a binary copy optimization for compaction operations in Lance. The feature enables page-level copying of data files during compaction, bypassing the decode-recode cycle for compatible files. This significantly improves compaction performance when merging small Lance files with identical schemas and versions.
Key changes:
- Added binary copy capability with
enable_binary_copyand related configuration options inCompactionOptions - Implemented
rewrite_files_binary_copyfunction that directly copies page data and buffers from source files to output files with proper alignment - Added version-aware handling for v2_0 vs v2_1+ file format differences (structural column materialization)
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 10 comments.
| File | Description |
|---|---|
| rust/lance/src/dataset/optimize.rs | Core implementation: added binary copy validation, page-level copy algorithm, footer writing, and comprehensive test coverage |
| rust/lance-file/src/writer.rs | Added initialize_with_external_metadata method to support writing Lance files with pre-built column metadata |
| rust/lance-datagen/src/generator.rs | Fixed bug: corrected TimestampMillisecondArray usage for millisecond timestamps (was using TimestampMicrosecondArray) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| Box::new(FnGen::<i64, TimestampMicrosecondArray, _>::new_known_size( | ||
| Box::new(FnGen::<i64, TimestampMillisecondArray, _>::new_known_size( |
| if !self.column_writers.is_empty() { | ||
| self.finish_writers().await?; | ||
| } |
There was a problem hiding this comment.
In Binary Copy, we pre-write data and metadata in a copy manner without calling FileWriter. However, when flushing the footer, in order to reuse existing code as much as possible, we will try to mock a file writer and call its finish method to trigger the writing of the footer. Therefore, a check is needed here to skip the execution of finish_writers in scenarios similar to Binary copy (where column_writers is empty).
| match object_store::aws::resolve_bucket_region(bucket, &client_options).await { | ||
| Ok(bucket_region) => Ok(Some(bucket_region)), | ||
| Err(e) => { | ||
| log::debug!( | ||
| "Failed to resolve S3 bucket region for '{}': {:?}; defaulting to provider chain", | ||
| bucket, | ||
| e | ||
| ); | ||
| // Fallback to region provider chain; let downstream choose a default | ||
| Ok(None) | ||
| } | ||
| } |
There was a problem hiding this comment.
This change seems unrelated (but is fairly minor). Will let @jackye1995 or @wjones127 weigh in on whether this is the best thing to do here or not.
There was a problem hiding this comment.
Not this PR related. Removed.
| let first_data_file_version = LanceFileVersion::try_from_major_minor( | ||
| fragments[0].files[0].file_major_version, | ||
| fragments[0].files[0].file_minor_version, | ||
| ) | ||
| .map(|v| v.resolve()) | ||
| .unwrap(); |
There was a problem hiding this comment.
I think you can just use dataset.manifest.data_storage_format like you check above.
|
|
||
| let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?; | ||
|
|
||
| if compaction_plan.tasks().is_empty() && options.enable_binary_copy_force { |
There was a problem hiding this comment.
Why is this an error instead of a no-op?
There was a problem hiding this comment.
Actually, it's not needed. This logic has already been removed.
| } else { | ||
| let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); | ||
| (None, data) | ||
| prepare_reader( |
There was a problem hiding this comment.
Why are you preparing a reader in this case? If we can use binary copy we shouldn't need a scan right?
There was a problem hiding this comment.
This reader is indeed not needed during the Binary copy process. In the design of binary copy, there is a feature: if any problem causes a panic during the rewrite phase of binary copy, it will roll back to the normal compaction logic. Therefore, this object was initialized in advance. However, after careful consideration, this design is somewhat cumbersome and has been simplified.
The logic for initializing the reader in the scenario of Binary copy has been removed.
| }); | ||
| } | ||
|
|
||
| if new_fragments.is_empty() { |
There was a problem hiding this comment.
Why would this be empty? Didn't we already verify that binary copy is supported if we reach this point?
There was a problem hiding this comment.
As mentioned before. Removed this logic
| Ok(()) | ||
| } | ||
|
|
||
| async fn rewrite_files_binary_copy( |
There was a problem hiding this comment.
This (and flush_footer) are big methods. We might want some kind of BinaryCopier utility struct instead? It could be in its own sub-module e.g. lance::dataset::optimize::binary_copy
|
Hi @westonpace Thanks a lot for your review. All comments are addressed. PTAL :) |
|
Hi @westonpace — hope you’re doing well ! Just a gentle ping on this PR. Your review would be really helpful. When you have time, could you take another look? Appreciate your help! |
|
Hi @jackye1995 would u mind to take a look at your convenience? Really appreciate if there are any feedback. Thanks in advance! |
| let mut frag = Fragment::new(0); | ||
| let mut df = DataFile::new_unstarted(current_filename.take().unwrap(), maj, min); | ||
| // v2_0 vs v2_1+ field-to-column index mapping for the final file | ||
| let is_structural = version >= LanceFileVersion::V2_1; |
There was a problem hiding this comment.
this is duplicated logic with L378
|
|
||
| if total_rows_in_current > 0 { | ||
| // Flush remaining rows as a final output file | ||
| // v2_0 compatibility: same single-page enforcement applies for the final file close |
There was a problem hiding this comment.
this is duplicate logic with L346
| for i in 0..count { | ||
| let addr = | ||
| lance_core::utils::address::RowAddress::new_from_parts(frag_id, i as u32); | ||
| addrs.insert(u64::from(addr)); |
There was a problem hiding this comment.
this would be pretty inefficient, can we use insert_range
| version: LanceFileVersion, | ||
| ) -> Result<()> { | ||
| let pos = writer.tell().await? as u64; | ||
| let _new_pos = apply_alignment_padding(&mut writer, pos, version).await?; |
There was a problem hiding this comment.
why result is discarded?
There was a problem hiding this comment.
apply_alignment_paddingmay write pad zero bytes viawriter.write_all(...)and thereby advances the writer’s internal position- The u64 (new_pos) it returns is just
the theoretically new position after writing the padding. This value is not needed for subsequent calculations in flush_footer (FileWriter::finish() will directly start writing the footer from the current position of the writer), so the return value is unused.
lance/rust/lance-file/src/writer.rs
Line 605 in e7540d7
| Err(_) => return false, | ||
| }; | ||
| // Capture schema mapping baseline from first data file | ||
| let ref_fields = &fragments[0].files[0].fields; |
There was a problem hiding this comment.
edge case: to be safe, should check !fragments[0].files.is_empty()
| /// - `fragments`: fragments to merge via binary copy (assumed consistent versions). | ||
| /// - `params`: write parameters (uses `max_rows_per_file`). | ||
| /// - `read_batch_bytes_opt`: optional I/O batch size when coalescing page reads. | ||
| pub async fn rewrite_files_binary_copy( |
There was a problem hiding this comment.
as we discussed in original github issue, we should reject the binary copy if the Lance file has global buffer. we can do that check after reading file_meta.
There was a problem hiding this comment.
Nice Catch. Add Global buffer checking in can_use_binary_copy function.
|
Hi @jackye1995 . Just a gentle reminder — could you please take another look when it’s convenient? Thanks! |
westonpace
left a comment
There was a problem hiding this comment.
I'm fine moving forwards with this. I'll give a bit for @jackye1995 to weigh in or any comments to be addressed.
I think this is something of a niche use case because we want the underlying data pages to be large but there could be some cases where it is useful (compacting large files into huge files, or search-only use cases where everything is random access and we don't care as much about page size).
It doesn't add too much complexity (mainly some if/else checks in the compaction code) and the rest is hidden in a dedicated module.
| /// Whether to enable binary copy optimization when eligible. | ||
| /// Defaults to false. | ||
| pub enable_binary_copy: bool, |
There was a problem hiding this comment.
| /// Whether to enable binary copy optimization when eligible. | |
| /// Defaults to false. | |
| pub enable_binary_copy: bool, | |
| /// Whether to enable binary copy optimization when eligible. | |
| /// | |
| /// This skips re-encoding the data and can lead to faster compaction | |
| /// times. However, it cannot merge pages together and should not be | |
| /// used when compacting small files together because the pages in the | |
| /// compacted file will be too small and this could lead to poor I/O patterns. | |
| /// | |
| /// Defaults to false. | |
| pub enable_binary_copy: bool, |
|
|
||
| for fragment in fragments { | ||
| if fragment.deletion_file.is_some() { | ||
| return Ok(false); |
There was a problem hiding this comment.
Whenever we return false it might be nice to log a debug message explaining why
| // Binary copy only preserves page and column-buffer bytes. The output file's footer | ||
| // (including global buffers) is re-generated, not copied from inputs. | ||
| // | ||
| // Therefore, we reject input files that contain any additional global buffers beyond | ||
| // the required schema / file descriptor global buffer (global buffer index 0). | ||
| if file_meta.file_buffers.len() > 1 { | ||
| return Ok(false); | ||
| } |
There was a problem hiding this comment.
At some point I think we are going to start writing file stats in the footer which might interfere with this check.
|
Hi @westonpace Thanks a lot for your help. All comments are all addressed and CI passed :) cc @jackye1995 |
|
Hi @jackye1995 , any concerns? |
Closes: #5433