diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 4952d9476c4..d4bdd996573 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -495,6 +495,26 @@ impl FileWriter { self.schema_metadata.insert(key.into(), value.into()); } + /// Prepare the writer when column data and metadata were produced externally. + /// + /// This is useful for flows that copy already-encoded pages (e.g., binary copy + /// during compaction) where the column buffers have been written directly and we + /// only need to write the footer and schema metadata. The provided + /// `column_metadata` must describe the buffers already persisted by the + /// underlying `ObjectWriter`, and `rows_written` should reflect the total number + /// of rows in those buffers. + pub fn initialize_with_external_metadata( + &mut self, + schema: lance_core::datatypes::Schema, + column_metadata: Vec, + rows_written: u64, + ) { + self.schema = Some(schema); + self.num_columns = column_metadata.len() as u32; + self.column_metadata = column_metadata; + self.rows_written = rows_written; + } + /// Adds a global buffer to the file /// /// The global buffer can contain any arbitrary bytes. It will be written to the disk @@ -595,7 +615,9 @@ impl FileWriter { .collect::>(); self.write_pages(encoding_tasks).await?; - self.finish_writers().await?; + if !self.column_writers.is_empty() { + self.finish_writers().await?; + } // 3. write global buffers (we write the schema here) let global_buffer_offsets = self.write_global_buffers().await?; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index d733cad64c4..321fa4dfa27 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -91,6 +91,7 @@ use super::rowids::load_row_id_sequences; use super::transaction::{Operation, RewriteGroup, RewrittenIndex, Transaction}; use super::utils::make_rowid_capture_stream; use super::{write_fragments_internal, WriteMode, WriteParams}; +use crate::dataset::utils::CapturedRowIds; use crate::io::commit::{commit_transaction, migrate_fragments}; use crate::Dataset; use crate::Result; @@ -109,10 +110,12 @@ use serde::{Deserialize, Serialize}; use snafu::location; use tracing::info; +mod binary_copy; pub mod remapping; use crate::index::frag_reuse::build_new_frag_reuse_index; use crate::io::deletion::read_dataset_deletion_file; +use binary_copy::rewrite_files_binary_copy; pub use remapping::{IgnoreRemap, IndexRemapper, IndexRemapperOptions, RemappedIndex}; /// Options to be passed to [compact_files]. @@ -156,6 +159,23 @@ pub struct CompactionOptions { /// not be remapped during this compaction operation. Instead, the fragment reuse index /// is updated and will be used to perform remapping later. pub defer_index_remap: bool, + /// Whether to enable binary copy optimization when eligible. + /// + /// This skips re-encoding the data and can lead to faster compaction + /// times. However, it cannot merge pages together and should not be + /// used when compacting small files together because the pages in the + /// compacted file will be too small and this could lead to poor I/O patterns. + /// + /// Defaults to false. + pub enable_binary_copy: bool, + /// Whether to force binary copy optimization. If true, compaction will fail + /// if binary copy is not supported for the given fragments. + /// Defaults to false. + pub enable_binary_copy_force: bool, + /// The batch size in bytes for reading during binary copy operations. + /// Controls how much data is read at once when performing binary copy. + /// Defaults to 16MB (16 * 1024 * 1024). + pub binary_copy_read_batch_bytes: Option, } impl Default for CompactionOptions { @@ -170,6 +190,9 @@ impl Default for CompactionOptions { max_bytes_per_file: None, batch_size: None, defer_index_remap: false, + enable_binary_copy: false, + enable_binary_copy_force: false, + binary_copy_read_batch_bytes: Some(16 * 1024 * 1024), } } } @@ -183,6 +206,149 @@ impl CompactionOptions { } } +/// Determine if page-level binary copy can safely merge the provided fragments. +/// +/// Preconditions checked in order: +/// - Feature flag `enable_binary_copy` is enabled +/// - Dataset storage format is non-legacy +/// - Fragment list is non-empty +/// - All data files share identical Lance file versions +/// - No fragment has a deletion file +/// TODO: Need to support schema evolution case like add column and drop column +/// - All data files share identical schema mappings (`fields`, `column_indices`) +/// - Input data files must not contain extra global buffers (beyond schema / file descriptor) +async fn can_use_binary_copy( + dataset: &Dataset, + options: &CompactionOptions, + fragments: &[Fragment], +) -> bool { + can_use_binary_copy_impl(dataset, options, fragments) + .await + .unwrap_or_else(|err| { + log::warn!("Binary copy disabled due to error: {}", err); + false + }) +} + +async fn can_use_binary_copy_impl( + dataset: &Dataset, + options: &CompactionOptions, + fragments: &[Fragment], +) -> Result { + use lance_file::reader::FileReader as LFReader; + use lance_file::version::LanceFileVersion; + use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; + + if !options.enable_binary_copy { + log::debug!("Binary copy disabled: enable_binary_copy config is false"); + return Ok(false); + } + + let has_blob_columns = dataset + .schema() + .fields_pre_order() + .any(|field| field.is_blob()); + if has_blob_columns { + log::debug!("Binary copy disabled: dataset contains blob columns"); + return Ok(false); + } + + let storage_ok = dataset + .manifest + .data_storage_format + .lance_file_version() + .map(|v| !matches!(v.resolve(), LanceFileVersion::Legacy)) + .unwrap_or(false); + if !storage_ok { + log::debug!("Binary copy disabled: dataset uses legacy storage format"); + return Ok(false); + } + + if fragments.is_empty() { + log::debug!("Binary copy disabled: no fragments to compact"); + return Ok(false); + } + + let storage_file_version = dataset + .manifest + .data_storage_format + .lance_file_version()? + .resolve(); + + if fragments[0].files.is_empty() { + log::debug!( + "Binary copy disabled: fragment {} has no data files", + fragments[0].id + ); + return Ok(false); + } + let ref_fields = &fragments[0].files[0].fields; + let ref_cols = &fragments[0].files[0].column_indices; + let mut is_same_version = true; + + for fragment in fragments { + if fragment.deletion_file.is_some() { + log::debug!( + "Binary copy disabled: fragment {} has a deletion file", + fragment.id + ); + return Ok(false); + } + + for data_file in &fragment.files { + let version_ok = LanceFileVersion::try_from_major_minor( + data_file.file_major_version, + data_file.file_minor_version, + ) + .map(|v| v.resolve()) + .is_ok_and(|v| v == storage_file_version); + + if !version_ok { + is_same_version = false; + } + if data_file.fields != *ref_fields || data_file.column_indices != *ref_cols { + return Ok(false); + } + + // check file global buffer + let object_store = match data_file.base_id { + Some(base_id) => dataset.object_store_for_base(base_id).await?, + None => dataset.object_store.clone(), + }; + let full_path = dataset + .data_file_dir(data_file)? + .child(data_file.path.as_str()); + let scan_scheduler = ScanScheduler::new( + object_store.clone(), + SchedulerConfig::max_bandwidth(&object_store), + ); + let file_scheduler = scan_scheduler + .open_file_with_priority(&full_path, 0, &data_file.file_size_bytes) + .await?; + let file_meta = LFReader::read_all_metadata(&file_scheduler).await?; + // Binary copy only preserves page and column-buffer bytes. The output file's footer + // (including global buffers) is re-generated, not copied from inputs. + // + // Therefore, we reject input files that contain any additional global buffers beyond + // the required schema / file descriptor global buffer (global buffer index 0). + if file_meta.file_buffers.len() > 1 { + log::debug!( + "Binary copy disabled: data file has extra global buffers (len={})", + file_meta.file_buffers.len() + ); + return Ok(false); + } + } + } + + if !is_same_version { + log::debug!("Binary copy disabled: data files use different file versions"); + return Ok(false); + } + + Ok(true) +} + /// Metrics returned by [compact_files]. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct CompactionMetrics { @@ -488,6 +654,64 @@ impl CompactionPlan { } } +/// Build a scan reader for rewrite and optionally capture row IDs. +/// +/// Parameters: +/// - `dataset`: Dataset handle used to create the scanner. +/// - `fragments`: When `with_frags` is true, restrict the scan to these old fragments +/// and preserve insertion order. +/// - `batch_size`: Optional batch size; if provided, set it on the scanner to control +/// read batching. +/// - `with_frags`: Whether to scan only the specified old fragments and force +/// in-order reading. +/// - `capture_row_ids`: When index remapping is needed, include and capture the +/// `_rowid` column from the stream. +/// +/// Returns: +/// - `SendableRecordBatchStream`: The batch stream (with `_rowid` removed if captured) +/// to feed the rewrite path. +/// - `Option>`: A receiver to obtain captured row IDs after the +/// stream completes; `None` if not capturing. +async fn prepare_reader( + dataset: &Dataset, + fragments: &[Fragment], + batch_size: Option, + with_frags: bool, + capture_row_ids: bool, +) -> Result<( + SendableRecordBatchStream, + Option>, +)> { + let mut scanner = dataset.scan(); + let has_blob_columns = dataset + .schema() + .fields_pre_order() + .any(|field| field.is_blob()); + if has_blob_columns { + scanner.blob_handling(BlobHandling::AllBinary); + } + if let Some(bs) = batch_size { + scanner.batch_size(bs); + } + if with_frags { + scanner + .with_fragments(fragments.to_vec()) + .scan_in_order(true); + } + if capture_row_ids { + scanner.with_row_id(); + let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); + let (data_no_row_ids, rx) = + make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; + Ok((data_no_row_ids, Some(rx))) + } else { + Ok(( + SendableRecordBatchStream::from(scanner.try_into_stream().await?), + None, + )) + } +} + /// A single group of fragments to compact, which is a view into the compaction /// plan. We keep the `replace_range` indices so we can map the result of the /// compact back to the fragments it replaces. @@ -725,18 +949,7 @@ async fn rewrite_files( .sum::(); // If we aren't using stable row ids, then we need to remap indices. let needs_remapping = !dataset.manifest.uses_stable_row_ids(); - let mut scanner = dataset.scan(); - let has_blob_columns = dataset - .schema() - .fields_pre_order() - .any(|field| field.is_blob()); - if has_blob_columns { - scanner.blob_handling(BlobHandling::AllBinary); - } - if let Some(batch_size) = options.batch_size { - scanner.batch_size(batch_size); - } - // Generate an ID for logging purposes + let mut new_fragments: Vec; let task_id = uuid::Uuid::new_v4(); log::info!( "Compaction task {}: Begin compacting {} rows across {} fragments", @@ -744,32 +957,43 @@ async fn rewrite_files( num_rows, fragments.len() ); - scanner - .with_fragments(fragments.clone()) - .scan_in_order(true); - let (row_ids_rx, reader) = if needs_remapping { - scanner.with_row_id(); - let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); - let (data_no_row_ids, row_id_rx) = - make_rowid_capture_stream(data, dataset.manifest.uses_stable_row_ids())?; - (Some(row_id_rx), data_no_row_ids) - } else { - let data = SendableRecordBatchStream::from(scanner.try_into_stream().await?); - (None, data) - }; + let can_binary_copy = can_use_binary_copy(dataset.as_ref(), options, &fragments).await; + if !can_binary_copy && options.enable_binary_copy_force { + return Err(Error::NotSupported { + source: format!("compaction task {}: binary copy is not supported", task_id).into(), + location: location!(), + }); + } + let mut row_ids_rx: Option> = None; + let mut reader: Option = None; - let mut rows_read = 0; - let schema = reader.schema(); - let reader = reader.inspect_ok(move |batch| { - rows_read += batch.num_rows(); - log::info!( - "Compaction task {}: Read progress {}/{}", - task_id, - rows_read, - num_rows, - ); - }); - let reader = Box::pin(RecordBatchStreamAdapter::new(schema, reader)); + if !can_binary_copy { + let (prepared_reader, rx_initial) = prepare_reader( + dataset.as_ref(), + &fragments, + options.batch_size, + true, + needs_remapping, + ) + .await?; + row_ids_rx = rx_initial; + + let mut rows_read = 0; + let schema = prepared_reader.schema(); + let reader_with_progress = prepared_reader.inspect_ok(move |batch| { + rows_read += batch.num_rows(); + log::info!( + "Compaction task {}: Read progress {}/{}", + task_id, + rows_read, + num_rows, + ); + }); + reader = Some(Box::pin(RecordBatchStreamAdapter::new( + schema, + reader_with_progress, + ))); + } let mut params = WriteParams { max_rows_per_file: options.target_rows_per_fragment, @@ -785,16 +1009,56 @@ async fn rewrite_files( params.enable_stable_row_ids = true; } - let (mut new_fragments, _) = write_fragments_internal( - Some(dataset.as_ref()), - dataset.object_store.clone(), - &dataset.base, - dataset.schema().clone(), - reader, - params, - None, // Compaction doesn't use target_bases - ) - .await?; + if can_binary_copy { + new_fragments = rewrite_files_binary_copy( + dataset.as_ref(), + &fragments, + ¶ms, + options.binary_copy_read_batch_bytes, + ) + .await?; + + if new_fragments.is_empty() && options.enable_binary_copy_force { + return Err(Error::NotSupported { + source: format!("compaction task {}: binary copy is not supported", task_id).into(), + location: location!(), + }); + } + + if needs_remapping { + let (tx, rx) = std::sync::mpsc::channel(); + let mut addrs = RoaringTreemap::new(); + for frag in &fragments { + let frag_id = frag.id as u32; + let count = u64::try_from(frag.physical_rows.unwrap_or(0)).map_err(|_| { + Error::Internal { + message: format!( + "Fragment {} has too many physical rows to represent as row addresses", + frag.id + ), + location: location!(), + } + })?; + let start = u64::from(lance_core::utils::address::RowAddress::first_row(frag_id)); + addrs.insert_range(start..start + count); + } + let captured = CapturedRowIds::AddressStyle(addrs); + let _ = tx.send(captured); + row_ids_rx = Some(rx); + } + } else { + let (frags, _) = write_fragments_internal( + Some(dataset.as_ref()), + dataset.object_store.clone(), + &dataset.base, + dataset.schema().clone(), + reader.expect("reader must be prepared for non-binary-copy path"), + params, + None, + ) + .await?; + new_fragments = frags; + } log::info!("Compaction task {}: file written", task_id); @@ -821,9 +1085,9 @@ async fn rewrite_files( (Some(row_id_map), None) } } else { - log::info!("Compaction task {}: rechunking stable row ids", task_id); - rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?; if dataset.manifest.uses_stable_row_ids() { + log::info!("Compaction task {}: rechunking stable row ids", task_id); + rechunk_stable_row_ids(dataset.as_ref(), &mut new_fragments, &fragments).await?; recalc_versions_for_rewritten_fragments( dataset.as_ref(), &mut new_fragments, @@ -1132,6 +1396,7 @@ pub async fn commit_compaction( #[cfg(test)] mod tests { + mod binary_copy; use self::remapping::RemappedIndex; use super::*; use crate::dataset::index::frag_reuse::cleanup_frag_reuse_index; @@ -1140,7 +1405,7 @@ mod tests { use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index}; use crate::index::vector::{StageParams, VectorIndexParams}; use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; - use arrow_array::types::{Float32Type, Int32Type, Int64Type}; + use arrow_array::types::{Float32Type, Float64Type, Int32Type, Int64Type}; use arrow_array::{ ArrayRef, Float32Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, PrimitiveArray, RecordBatch, RecordBatchIterator, @@ -1155,7 +1420,9 @@ mod tests { use lance_datagen::Dimension; use lance_file::version::LanceFileVersion; use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME; - use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams}; + use lance_index::scalar::{ + BuiltinIndexType, FullTextSearchQuery, InvertedIndexParams, ScalarIndexParams, + }; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::pq::PQBuildParams; use lance_index::{Index, IndexType}; diff --git a/rust/lance/src/dataset/optimize/binary_copy.rs b/rust/lance/src/dataset/optimize/binary_copy.rs new file mode 100644 index 00000000000..2a51e8aca9b --- /dev/null +++ b/rust/lance/src/dataset/optimize/binary_copy.rs @@ -0,0 +1,577 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use crate::dataset::fragment::write::generate_random_filename; +use crate::dataset::WriteParams; +use crate::dataset::DATA_DIR; +use crate::datatypes::Schema; +use crate::Dataset; +use crate::Result; +use lance_arrow::DataTypeExt; +use lance_core::Error; +use lance_encoding::decoder::{ColumnInfo, PageEncoding, PageInfo as DecPageInfo}; +use lance_encoding::version::LanceFileVersion; +use lance_file::format::pbfile; +use lance_file::reader::FileReader as LFReader; +use lance_file::writer::{FileWriter, FileWriterOptions}; +use lance_io::object_writer::ObjectWriter; +use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; +use lance_io::traits::Writer; +use lance_table::format::{DataFile, Fragment}; +use prost::Message; +use prost_types::Any; +use snafu::location; +use std::ops::Range; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; + +const ALIGN: usize = 64; + +/// Apply 64-byte alignment padding for V2.1+ files. +/// +/// For V2.1+, writes padding bytes to align the current position to a 64-byte boundary. +/// For V2.0 and earlier, no padding is applied as alignment is not required. +/// +/// Returns the new position after padding (if any). +async fn apply_alignment_padding( + writer: &mut ObjectWriter, + current_pos: u64, + version: LanceFileVersion, +) -> Result { + if version >= LanceFileVersion::V2_1 { + static ZERO_BUFFER: std::sync::OnceLock> = std::sync::OnceLock::new(); + let zero_buf = ZERO_BUFFER.get_or_init(|| vec![0u8; ALIGN]); + + let pad = (ALIGN - (current_pos as usize % ALIGN)) % ALIGN; + if pad != 0 { + writer.write_all(&zero_buf[..pad]).await?; + return Ok(current_pos + pad as u64); + } + } + Ok(current_pos) +} + +async fn init_writer_if_necessary( + dataset: &Dataset, + current_writer: &mut Option, + current_filename: &mut Option, +) -> Result { + if current_writer.is_none() { + let filename = format!("{}.lance", generate_random_filename()); + let path = dataset.base.child(DATA_DIR).child(filename.as_str()); + let writer = dataset.object_store.create(&path).await?; + *current_writer = Some(writer); + *current_filename = Some(filename); + return Ok(true); + } + Ok(false) +} + +/// v2_0 vs v2_1+ field-to-column index mapping +/// - v2_1+ stores only leaf columns; non-leaf fields get `-1` in the mapping +/// - v2_0 includes structural headers as columns; non-leaf fields map to a concrete index +fn compute_field_column_indices( + schema: &Schema, + full_field_ids_len: usize, + version: LanceFileVersion, +) -> Vec { + let is_structural = version >= LanceFileVersion::V2_1; + let mut field_column_indices: Vec = Vec::with_capacity(full_field_ids_len); + let mut curr_col_idx: i32 = 0; + for field in schema.fields_pre_order() { + if field.is_packed_struct() || field.is_leaf() || !is_structural { + field_column_indices.push(curr_col_idx); + curr_col_idx += 1; + } else { + field_column_indices.push(-1); + } + } + field_column_indices +} + +/// Finalize the current output file and return it as a single [Fragment]. +/// - Ensures an output writer / filename is present (creates a new file if needed). +/// - Converts the in-memory `col_pages` / `col_buffers` into `ColumnInfo` metadata, draining them. +/// - Applies v2_0 structural header rules (single page, normalized `num_rows` and `priority`). +/// - Writes the Lance footer via [flush_footer] and registers the resulting [DataFile] in a [Fragment]. +/// +/// PAY ATTENTION current function will: +/// - Takes (`Option::take`) the current writer and filename. +/// - Drains `col_pages` and `col_buffers` for all columns. +#[allow(clippy::too_many_arguments)] +async fn finalize_current_output_file( + schema: &Schema, + full_field_ids: &[i32], + current_writer: &mut Option, + current_filename: &mut Option, + current_page_table: &[ColumnInfo], + col_pages: &mut [Vec], + col_buffers: &mut [Vec<(u64, u64)>], + is_non_leaf_column: &[bool], + total_rows_in_current: u64, + version: LanceFileVersion, +) -> Result { + let mut final_cols: Vec> = Vec::with_capacity(current_page_table.len()); + for (i, column_info) in current_page_table.iter().enumerate() { + let mut pages_vec = std::mem::take(&mut col_pages[i]); + // For v2_0 struct headers, force a single page and set num_rows to total + if version == LanceFileVersion::V2_0 + && is_non_leaf_column.get(i).copied().unwrap_or(false) + && !pages_vec.is_empty() + { + pages_vec[0].num_rows = total_rows_in_current; + pages_vec[0].priority = 0; + pages_vec.truncate(1); + } + let pages_arc = Arc::from(pages_vec.into_boxed_slice()); + let buffers_vec = std::mem::take(&mut col_buffers[i]); + final_cols.push(Arc::new(ColumnInfo::new( + column_info.index, + pages_arc, + buffers_vec, + column_info.encoding.clone(), + ))); + } + let writer = current_writer.take().unwrap(); + flush_footer(writer, schema, &final_cols, total_rows_in_current, version).await?; + + // Register the newly closed output file as a fragment data file + let (maj, min) = version.to_numbers(); + let mut fragment = Fragment::new(0); + let mut data_file = DataFile::new_unstarted(current_filename.take().unwrap(), maj, min); + data_file.fields = full_field_ids.to_vec(); + data_file.column_indices = compute_field_column_indices(schema, full_field_ids.len(), version); + fragment.files.push(data_file); + fragment.physical_rows = Some(total_rows_in_current as usize); + Ok(fragment) +} + +/// Rewrite the files in a single task using binary copy semantics. +/// +/// Flow overview (per task): +/// fragments +/// └── data files +/// └── columns +/// └── pages (batched reads) -> aligned writes -> page metadata +/// └── column buffers -> aligned writes -> buffer metadata +/// └── flush when target rows reached -> write footer -> fragment metadata +/// └── final flush for remaining rows +/// +/// Behavior highlights: +/// - Assumes all input files share the same Lance file version; version drives column-count +/// calculation (v2.0 includes structural headers, v2.1+ only leaf columns). +/// - Preserves stable row ids by concatenating row-id sequences when enabled. +/// - Enforces 64-byte alignment for page and buffer writes in V2.1+ files (V2.0 does not require alignment). +/// - For v2.0, preserves single-page structural headers and normalizes their row counts/priority. +/// - Flushes an output file once `max_rows_per_file` rows are accumulated, then repeats. +/// +/// Parameters: +/// - `dataset`: target dataset (for storage/config and schema). +/// - `fragments`: fragments to merge via binary copy (assumed consistent versions). +/// - `params`: write parameters (uses `max_rows_per_file`). +/// - `read_batch_bytes_opt`: optional I/O batch size when coalescing page reads. +pub async fn rewrite_files_binary_copy( + dataset: &Dataset, + fragments: &[Fragment], + params: &WriteParams, + read_batch_bytes_opt: Option, +) -> Result> { + if fragments.is_empty() || fragments.iter().any(|fragment| fragment.files.is_empty()) { + return Err(Error::invalid_input( + "binary copy requires at least one data file", + location!(), + )); + } + + // Binary copy algorithm overview: + // - Reads page and buffer regions directly from source files in bounded batches + // - Appends them to a new output file with alignment, updating offsets + // - Recomputes page priorities by adding the cumulative row count to preserve order + // - For v2_0, enforces single-page structural header columns when closing a file + // - Writes a new footer (schema descriptor, column metadata, offset tables, version) + // - Optionally carries forward stable row ids and persists them inline in fragment metadata + // Merge small Lance files into larger ones by page-level binary copy. + let schema = dataset.schema().clone(); + let full_field_ids = schema.field_ids(); + + // The previous checks have ensured that the file versions of all files are consistent. + let version = LanceFileVersion::try_from_major_minor( + fragments[0].files[0].file_major_version, + fragments[0].files[0].file_minor_version, + ) + .unwrap() + .resolve(); + // v2.0 and v2.1+ handle structural headers differently during file writing: + // - v2_0 materializes ALL fields in pre-order traversal (leaf fields + non-leaf struct headers), + // which means the ColumnInfo set includes all fields in pre-order traversal. + // - v2_1+ materializes fields that are either leaf columns OR packed structs. Non-leaf structural + // headers (unpacked structs with children) are not stored as columns. + // As a result, the ColumnInfo set contains leaf fields and packed structs. + // To correctly align copy layout, we derive `column_count` by version: + // - v2_0: use total number of fields in pre-order (leaf + non-leaf headers) + // - v2_1+: use only the number of leaf fields plus packed structs + let column_count = if version == LanceFileVersion::V2_0 { + schema.fields_pre_order().count() + } else { + schema + .fields_pre_order() + .filter(|f| f.is_packed_struct() || f.is_leaf()) + .count() + }; + + // v2_0 compatibility: build a map to identify non-leaf structural header columns + // - In v2_0 these headers exist as columns and must have a single page + // - In v2_1+ these headers are not stored as columns and this map is unused + let mut is_non_leaf_column: Vec = vec![false; column_count]; + if version == LanceFileVersion::V2_0 { + for (col_idx, field) in schema.fields_pre_order().enumerate() { + // Only mark non-packed Struct fields (lists remain as leaf data carriers) + let is_non_leaf = field.data_type().is_struct() && !field.is_packed_struct(); + is_non_leaf_column[col_idx] = is_non_leaf; + } + } + + let mut out: Vec = Vec::new(); + let mut current_writer: Option = None; + let mut current_filename: Option = None; + let mut current_pos: u64 = 0; + let mut current_page_table: Vec = Vec::new(); + // Baseline column encodings captured from the first source file; all subsequent + // files must match per-column to safely concatenate column-level buffers. + let mut baseline_col_encoding_bytes: Vec> = Vec::new(); + + // Column-list> + let mut col_pages: Vec> = std::iter::repeat_with(Vec::::new) + .take(column_count) + .collect(); + let mut col_buffers: Vec> = vec![Vec::new(); column_count]; + let mut total_rows_in_current: u64 = 0; + let max_rows_per_file = params.max_rows_per_file as u64; + + // Visit each fragment and all of its data files (a fragment may contain multiple files) + for frag in fragments.iter() { + for df in frag.files.iter() { + let object_store = if let Some(base_id) = df.base_id { + dataset.object_store_for_base(base_id).await? + } else { + dataset.object_store.clone() + }; + let full_path = dataset.data_file_dir(df)?.child(df.path.as_str()); + let scan_scheduler = ScanScheduler::new( + object_store.clone(), + SchedulerConfig::max_bandwidth(&object_store), + ); + let file_scheduler = scan_scheduler + .open_file_with_priority(&full_path, 0, &df.file_size_bytes) + .await?; + let file_meta = LFReader::read_all_metadata(&file_scheduler).await?; + let src_column_infos = file_meta.column_infos.clone(); + // Initialize current_page_table + if current_page_table.is_empty() { + current_page_table = src_column_infos + .iter() + .map(|column_index| ColumnInfo { + index: column_index.index, + buffer_offsets_and_sizes: Arc::from( + Vec::<(u64, u64)>::new().into_boxed_slice(), + ), + page_infos: Arc::from(Vec::::new().into_boxed_slice()), + encoding: column_index.encoding.clone(), + }) + .collect(); + baseline_col_encoding_bytes = src_column_infos + .iter() + .map(|ci| Any::from_msg(&ci.encoding).unwrap().encode_to_vec()) + .collect(); + } + + // Iterate through each column of the current data file of the current fragment + for (col_idx, src_column_info) in src_column_infos.iter().enumerate() { + // v2_0 compatibility: special handling for non-leaf structural header columns + // - v2_0 expects structural header columns to have a SINGLE page; they carry layout + // metadata only and are not true data carriers. + // - When merging multiple input files via binary copy, naively appending pages would + // yield multiple pages for the same structural header column, violating v2_0 rules. + // - To preserve v2_0 invariants, we skip pages beyond the first one for these columns. + // - During finalization we also normalize the single remaining page’s `num_rows` to the + // total number of rows in the output file and reset `priority` to 0. + // - For v2_1+ this logic does not apply because non-leaf headers are not stored as columns. + let is_non_leaf = col_idx < is_non_leaf_column.len() && is_non_leaf_column[col_idx]; + if is_non_leaf && !col_pages[col_idx].is_empty() { + continue; + } + + if init_writer_if_necessary(dataset, &mut current_writer, &mut current_filename) + .await? + { + current_pos = 0; + } + + let read_batch_bytes: u64 = read_batch_bytes_opt.unwrap_or(16 * 1024 * 1024) as u64; + + let mut page_index = 0; + + // Iterate through each page of the current column in the current data file of the current fragment + while page_index < src_column_info.page_infos.len() { + let mut batch_ranges: Vec> = Vec::new(); + let mut batch_counts: Vec = Vec::new(); + let mut batch_bytes: u64 = 0; + let mut batch_pages: usize = 0; + // Build a single read batch by coalescing consecutive pages up to + // `read_batch_bytes` budget: + // - Accumulate total bytes (`batch_bytes`) and page count (`batch_pages`). + // - For each page, append its buffer ranges to `batch_ranges` and record + // the number of buffers in `batch_counts` so returned bytes can be + // mapped back to page boundaries. + // - Stop when adding the next page would exceed the byte budget, then + // issue one I/O request for the collected ranges. + // - Advance `page_index` to reflect pages scheduled in this batch. + for current_page in &src_column_info.page_infos[page_index..] { + let page_bytes: u64 = current_page + .buffer_offsets_and_sizes + .iter() + .map(|(_, size)| *size) + .sum(); + let would_exceed = + batch_pages > 0 && (batch_bytes + page_bytes > read_batch_bytes); + if would_exceed { + break; + } + batch_counts.push(current_page.buffer_offsets_and_sizes.len()); + for (offset, size) in current_page.buffer_offsets_and_sizes.iter() { + batch_ranges.push((*offset)..(*offset + *size)); + } + batch_bytes += page_bytes; + batch_pages += 1; + page_index += 1; + } + + let bytes_vec = if batch_ranges.is_empty() { + Vec::new() + } else { + // read many buffers at once + file_scheduler.submit_request(batch_ranges, 0).await? + }; + let mut bytes_iter = bytes_vec.into_iter(); + + for (local_idx, buffer_count) in batch_counts.iter().enumerate() { + // Reconstruct the absolute page index within the source column: + // - `page_index` now points to the page position + // - `batch_pages` is how many pages we included in this batch + // - `local_idx` enumerates pages inside the batch [0..batch_pages) + // Therefore `page_index - batch_pages + local_idx` yields the exact + // source page we are currently materializing, allowing us to access + // its metadata (encoding, row count, buffers) for the new page entry. + let page = + &src_column_info.page_infos[page_index - batch_pages + local_idx]; + let mut new_offsets = Vec::with_capacity(*buffer_count); + for _ in 0..*buffer_count { + if let Some(bytes) = bytes_iter.next() { + let writer = current_writer.as_mut().unwrap(); + current_pos = + apply_alignment_padding(writer, current_pos, version).await?; + let start = current_pos; + writer.write_all(&bytes).await?; + current_pos += bytes.len() as u64; + new_offsets.push((start, bytes.len() as u64)); + } + } + + // manual clone encoding + let encoding = if page.encoding.is_structural() { + PageEncoding::Structural(page.encoding.as_structural().clone()) + } else { + PageEncoding::Legacy(page.encoding.as_legacy().clone()) + }; + // `priority` acts as the global row offset for this page, ensuring + // downstream iterators maintain the correct logical order across + // merged inputs. + let new_page_info = DecPageInfo { + num_rows: page.num_rows, + priority: page.priority + total_rows_in_current, + encoding, + buffer_offsets_and_sizes: Arc::from(new_offsets.into_boxed_slice()), + }; + col_pages[col_idx].push(new_page_info); + } + } // finished scheduling & copying pages for this column in the current source file + + if !src_column_info.buffer_offsets_and_sizes.is_empty() { + // Validate column-level encoding compatibility before copying buffers + let src_col_encoding_bytes = Any::from_msg(&src_column_info.encoding) + .unwrap() + .encode_to_vec(); + let baseline_bytes = &baseline_col_encoding_bytes[col_idx]; + if src_col_encoding_bytes != *baseline_bytes { + return Err(Error::Execution { + message: format!( + "binary copy: The ColumnEncoding of column {} is incompatible with the first file, \ + making it impossible to safely concatenate buffers", + col_idx + ), + location: location!(), + }); + } + let ranges: Vec> = src_column_info + .buffer_offsets_and_sizes + .iter() + .map(|(offset, size)| (*offset)..(*offset + *size)) + .collect(); + let bytes_vec = file_scheduler.submit_request(ranges, 0).await?; + for bytes in bytes_vec.into_iter() { + let writer = current_writer.as_mut().unwrap(); + current_pos = apply_alignment_padding(writer, current_pos, version).await?; + let start = current_pos; + writer.write_all(&bytes).await?; + current_pos += bytes.len() as u64; + col_buffers[col_idx].push((start, bytes.len() as u64)); + } + } + } // finished all columns in the current source file + + // Accumulate rows for the current output file and flush when reaching the threshold + total_rows_in_current += file_meta.num_rows; + if total_rows_in_current >= max_rows_per_file { + let fragment_out = finalize_current_output_file( + &schema, + &full_field_ids, + &mut current_writer, + &mut current_filename, + ¤t_page_table, + &mut col_pages, + &mut col_buffers, + &is_non_leaf_column, + total_rows_in_current, + version, + ) + .await?; + + // Reset state for next output file + current_writer = None; + current_pos = 0; + current_page_table.clear(); + for v in col_pages.iter_mut() { + v.clear(); + } + for v in col_buffers.iter_mut() { + v.clear(); + } + out.push(fragment_out); + total_rows_in_current = 0; + } + } + } // Finished writing all fragments; any remaining data in memory will be flushed below + + if total_rows_in_current > 0 { + // Flush remaining rows as a final output file + init_writer_if_necessary(dataset, &mut current_writer, &mut current_filename).await?; + let frag = finalize_current_output_file( + &schema, + &full_field_ids, + &mut current_writer, + &mut current_filename, + ¤t_page_table, + &mut col_pages, + &mut col_buffers, + &is_non_leaf_column, + total_rows_in_current, + version, + ) + .await?; + out.push(frag); + } + Ok(out) +} + +/// Finalizes a compacted data file by writing the Lance footer via `FileWriter`. +/// +/// This function does not manually craft the footer. Instead it: +/// - Pads the current `ObjectWriter` position to a 64‑byte boundary (required for v2_1+ readers). +/// - Converts the collected per‑column info (`final_cols`) into `ColumnMetadata`. +/// - Constructs a `lance_file::writer::FileWriter` with the active `schema`, column metadata, +/// and `total_rows_in_current`. +/// - Calls `FileWriter::finish()` to emit column metadata, offset tables, global buffers +/// (schema descriptor), version, and to close the writer. +/// +/// Preconditions: +/// - All page data and column‑level buffers referenced by `final_cols` have already been written +/// to `writer`; otherwise offsets in the footer will be invalid. +/// +/// Version notes: +/// - v2_0 structural single‑page enforcement is handled when building `final_cols`; this function +/// only performs consistent finalization. +async fn flush_footer( + mut writer: ObjectWriter, + schema: &Schema, + final_cols: &[Arc], + total_rows_in_current: u64, + version: LanceFileVersion, +) -> Result<()> { + let pos = writer.tell().await? as u64; + let _new_pos = apply_alignment_padding(&mut writer, pos, version).await?; + + let mut col_metadatas = Vec::with_capacity(final_cols.len()); + for col in final_cols { + let pages = col + .page_infos + .iter() + .map(|page_info| { + let encoded_encoding = match &page_info.encoding { + PageEncoding::Legacy(array_encoding) => { + Any::from_msg(array_encoding)?.encode_to_vec() + } + PageEncoding::Structural(page_layout) => { + Any::from_msg(page_layout)?.encode_to_vec() + } + }; + let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = page_info + .buffer_offsets_and_sizes + .as_ref() + .iter() + .cloned() + .unzip(); + Ok(pbfile::column_metadata::Page { + buffer_offsets, + buffer_sizes, + encoding: Some(pbfile::Encoding { + location: Some(pbfile::encoding::Location::Direct( + pbfile::DirectEncoding { + encoding: encoded_encoding, + }, + )), + }), + length: page_info.num_rows, + priority: page_info.priority, + }) + }) + .collect::>>()?; + let (buffer_offsets, buffer_sizes): (Vec<_>, Vec<_>) = + col.buffer_offsets_and_sizes.iter().cloned().unzip(); + let encoded_col_encoding = Any::from_msg(&col.encoding)?.encode_to_vec(); + let column = pbfile::ColumnMetadata { + pages, + buffer_offsets, + buffer_sizes, + encoding: Some(pbfile::Encoding { + location: Some(pbfile::encoding::Location::Direct(pbfile::DirectEncoding { + encoding: encoded_col_encoding, + })), + }), + }; + col_metadatas.push(column); + } + let mut file_writer = FileWriter::new_lazy( + writer, + FileWriterOptions { + format_version: Some(version), + ..Default::default() + }, + ); + file_writer.initialize_with_external_metadata( + schema.clone(), + col_metadatas, + total_rows_in_current, + ); + file_writer.finish().await?; + Ok(()) +} diff --git a/rust/lance/src/dataset/optimize/tests/binary_copy.rs b/rust/lance/src/dataset/optimize/tests/binary_copy.rs new file mode 100644 index 00000000000..6418b34455f --- /dev/null +++ b/rust/lance/src/dataset/optimize/tests/binary_copy.rs @@ -0,0 +1,774 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use super::*; + +#[tokio::test] +async fn test_binary_copy_merge_small_files() { + for version in LanceFileVersion::iter_non_legacy() { + do_test_binary_copy_merge_small_files(version).await; + } +} + +async fn do_test_binary_copy_merge_small_files(version: LanceFileVersion) { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + + let data = sample_data(); + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let reader2 = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 2_500, + max_rows_per_group: 1_000, + data_storage_version: Some(version), + ..Default::default() + }; + let mut dataset = Dataset::write(reader, test_uri, Some(write_params.clone())) + .await + .unwrap(); + dataset.append(reader2, Some(write_params)).await.unwrap(); + + let before = dataset.scan().try_into_batch().await.unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 100_000_000, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + let metrics = compact_files(&mut dataset, options, None).await.unwrap(); + assert!(metrics.fragments_added >= 1); + assert_eq!( + dataset.count_rows(None).await.unwrap() as usize, + before.num_rows() + ); + let after = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(before, after); +} + +#[tokio::test] +async fn test_binary_copy_with_defer_remap() { + for version in LanceFileVersion::iter_non_legacy() { + do_test_binary_copy_with_defer_remap(version).await; + } +} + +async fn do_test_binary_copy_with_defer_remap(version: LanceFileVersion) { + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount}; + use std::sync::Arc; + + let fixed_list_dt = + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4); + + let meta_fields = Fields::from(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Int32, true), + Field::new("c", fixed_list_dt.clone(), true), + ]); + + let inner_fields = Fields::from(vec![ + Field::new("x", DataType::UInt32, true), + Field::new("y", DataType::LargeUtf8, true), + ]); + let nested_fields = Fields::from(vec![ + Field::new("inner", DataType::Struct(inner_fields.clone()), true), + Field::new("fsb", DataType::FixedSizeBinary(8), true), + ]); + + let event_fields = Fields::from(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("payload", DataType::Binary, true), + ]); + + let reader = gen_batch() + .col("vec", array::rand_vec::(Dimension::from(16))) + .col("i", array::step::()) + .col("meta", array::rand_struct(meta_fields)) + .col("nested", array::rand_struct(nested_fields)) + .col( + "events", + array::rand_list_any(array::rand_struct(event_fields), true), + ) + .into_reader_rows(RowCount::from(6_000), BatchCount::from(1)); + + let mut dataset = Dataset::write( + reader, + "memory://test/binary_copy_nested", + Some(WriteParams { + max_rows_per_file: 1_000, + data_storage_version: Some(version), + ..Default::default() + }), + ) + .await + .unwrap(); + + let before_batch = dataset.scan().try_into_batch().await.unwrap(); + + let options = CompactionOptions { + defer_index_remap: true, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + let _metrics = compact_files(&mut dataset, options, None).await.unwrap(); + + let after_batch = dataset.scan().try_into_batch().await.unwrap(); + + assert_eq!(before_batch, after_batch); +} + +#[tokio::test] +async fn test_binary_copy_preserves_stable_row_ids() { + for version in LanceFileVersion::iter_non_legacy() { + do_binary_copy_preserves_stable_row_ids(version).await; + } +} + +async fn do_binary_copy_preserves_stable_row_ids(version: LanceFileVersion) { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; + let mut data_gen = BatchGenerator::new() + .col(Box::new( + RandomVector::new().vec_width(8).named("vec".to_owned()), + )) + .col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + + let mut dataset = Dataset::write( + data_gen.batch(4_000), + format!("memory://test/binary_copy_stable_row_ids_{}", version).as_str(), + Some(WriteParams { + enable_stable_row_ids: true, + data_storage_version: Some(version), + max_rows_per_file: 500, + ..Default::default() + }), + ) + .await + .unwrap(); + + dataset + .create_index( + &["i"], + IndexType::Scalar, + Some("scalar".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + let params = VectorIndexParams::ivf_pq(1, 8, 1, MetricType::L2, 50); + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some("vector".into()), + ¶ms, + false, + ) + .await + .unwrap(); + + async fn index_set(dataset: &Dataset) -> HashSet { + dataset + .load_indices() + .await + .unwrap() + .iter() + .map(|index| index.uuid) + .collect() + } + let indices = index_set(&dataset).await; + + async fn vector_query(dataset: &Dataset) -> RecordBatch { + let mut scanner = dataset.scan(); + let query = Float32Array::from(vec![0.0f32; 8]); + scanner + .nearest("vec", &query, 10) + .unwrap() + .project(&["i"]) + .unwrap(); + scanner.try_into_batch().await.unwrap() + } + + async fn scalar_query(dataset: &Dataset) -> RecordBatch { + let mut scanner = dataset.scan(); + scanner.filter("i = 100").unwrap().project(&["i"]).unwrap(); + scanner.try_into_batch().await.unwrap() + } + + let before_vec_result = vector_query(&dataset).await; + let before_scalar_result = scalar_query(&dataset).await; + + let before_batch = dataset + .scan() + .project(&["vec", "i"]) + .unwrap() + .with_row_id() + .try_into_batch() + .await + .unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 2_000, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + let _metrics = compact_files(&mut dataset, options, None).await.unwrap(); + + let current_indices = index_set(&dataset).await; + assert_eq!(indices, current_indices); + + let after_vec_result = vector_query(&dataset).await; + assert_eq!(before_vec_result, after_vec_result); + + let after_scalar_result = scalar_query(&dataset).await; + assert_eq!(before_scalar_result, after_scalar_result); + + let after_batch = dataset + .scan() + .project(&["vec", "i"]) + .unwrap() + .with_row_id() + .try_into_batch() + .await + .unwrap(); + + let before_idx = arrow_ord::sort::sort_to_indices( + before_batch.column_by_name(lance_core::ROW_ID).unwrap(), + None, + None, + ) + .unwrap(); + let after_idx = arrow_ord::sort::sort_to_indices( + after_batch.column_by_name(lance_core::ROW_ID).unwrap(), + None, + None, + ) + .unwrap(); + let before = arrow::compute::take_record_batch(&before_batch, &before_idx).unwrap(); + let after = arrow::compute::take_record_batch(&after_batch, &after_idx).unwrap(); + + assert_eq!(before, after); +} + +#[tokio::test] +async fn test_binary_copy_remaps_unstable_row_ids() { + for version in LanceFileVersion::iter_non_legacy() { + do_binary_copy_remaps_unstable_row_ids(version).await; + } +} + +async fn do_binary_copy_remaps_unstable_row_ids(version: LanceFileVersion) { + let mut data_gen = BatchGenerator::new() + .col(Box::new( + RandomVector::new().vec_width(8).named("vec".to_owned()), + )) + .col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + + let mut dataset = Dataset::write( + data_gen.batch(4_000), + "memory://test/binary_copy_no_stable", + Some(WriteParams { + enable_stable_row_ids: false, + data_storage_version: Some(version), + max_rows_per_file: 500, + ..Default::default() + }), + ) + .await + .unwrap(); + + dataset + .create_index( + &["i"], + IndexType::Scalar, + Some("scalar".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + let params = VectorIndexParams::ivf_pq(1, 8, 1, MetricType::L2, 50); + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some("vector".into()), + ¶ms, + false, + ) + .await + .unwrap(); + + async fn vector_query(dataset: &Dataset) -> RecordBatch { + let mut scanner = dataset.scan(); + let query = Float32Array::from(vec![0.0f32; 8]); + scanner + .nearest("vec", &query, 10) + .unwrap() + .project(&["i"]) + .unwrap(); + scanner.try_into_batch().await.unwrap() + } + + async fn scalar_query(dataset: &Dataset) -> RecordBatch { + let mut scanner = dataset.scan(); + scanner.filter("i = 100").unwrap().project(&["i"]).unwrap(); + scanner.try_into_batch().await.unwrap() + } + + let before_vec_result = vector_query(&dataset).await; + let before_scalar_result = scalar_query(&dataset).await; + let before_batch = dataset + .scan() + .project(&["vec", "i"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 2_000, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + let _metrics = compact_files(&mut dataset, options, None).await.unwrap(); + + let after_vec_result = vector_query(&dataset).await; + assert_eq!(before_vec_result, after_vec_result); + + let after_scalar_result = scalar_query(&dataset).await; + assert_eq!(before_scalar_result, after_scalar_result); + + let after_batch = dataset + .scan() + .project(&["vec", "i"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!(before_batch, after_batch); +} + +#[tokio::test] +async fn test_binary_copy_preserves_zonemap_queries() { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + let mut data_gen = BatchGenerator::new() + .col(Box::new(IncrementingInt32::new().named("a".to_owned()))) + .col(Box::new(IncrementingInt32::new().named("b".to_owned()))); + + let mut dataset = Dataset::write( + data_gen.batch(5_000), + "memory://test/binary_copy_zonemap", + Some(WriteParams { + max_rows_per_file: 500, + data_storage_version: Some(LanceFileVersion::V2_1), + ..Default::default() + }), + ) + .await + .unwrap(); + + let zonemap_params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + dataset + .create_index( + &["a"], + IndexType::Scalar, + Some("zonemap".into()), + &zonemap_params, + false, + ) + .await + .unwrap(); + + let predicate = "a >= 2500 AND b < 4000"; + let before = dataset + .scan() + .filter(predicate) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 100_000, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + compact_files(&mut dataset, options, None).await.unwrap(); + + let after = dataset + .scan() + .filter(predicate) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!(before, after); +} + +#[tokio::test] +async fn test_binary_copy_preserves_bloom_filter_queries() { + use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; + + let mut data_gen = BatchGenerator::new() + .col(Box::new(IncrementingInt32::new().named("id".to_owned()))) + .col(Box::new(IncrementingInt32::new().named("val".to_owned()))); + + let mut dataset = Dataset::write( + data_gen.batch(6_000), + "memory://test/binary_copy_bloom", + Some(WriteParams { + max_rows_per_file: 500, + data_storage_version: Some(LanceFileVersion::V2_1), + ..Default::default() + }), + ) + .await + .unwrap(); + + #[derive(serde::Serialize)] + struct BloomParams { + number_of_items: u64, + probability: f64, + } + let bloom_params = + ScalarIndexParams::for_builtin(BuiltinIndexType::BloomFilter).with_params(&BloomParams { + number_of_items: 500, + probability: 0.01, + }); + dataset + .create_index( + &["val"], + IndexType::Scalar, + Some("bloom".into()), + &bloom_params, + false, + ) + .await + .unwrap(); + + let predicate = "val IN (123, 124, 125, 126)"; + let before = dataset + .scan() + .filter(predicate) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 100_000, + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + compact_files(&mut dataset, options, None).await.unwrap(); + + let after = dataset + .scan() + .filter(predicate) + .unwrap() + .try_into_batch() + .await + .unwrap(); + + assert_eq!(before, after); +} + +#[tokio::test] +async fn test_binary_copy_fallback_to_common_compaction() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let data = sample_data(); + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 500, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + dataset.delete("a < 100").await.unwrap(); + + let before = dataset.scan().try_into_batch().await.unwrap(); + + let options = CompactionOptions { + target_rows_per_fragment: 100_000, + enable_binary_copy: true, + ..Default::default() + }; + + let frags: Vec = dataset + .get_fragments() + .into_iter() + .map(Into::into) + .collect(); + assert!(!can_use_binary_copy(&dataset, &options, &frags).await); + + let _metrics = compact_files(&mut dataset, options, None).await.unwrap(); + + let after = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(before, after); +} + +#[tokio::test] +async fn test_can_use_binary_copy_schema_consistency_ok() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let data = sample_data(); + let reader1 = RecordBatchIterator::new(vec![Ok(data.slice(0, 5_000))], data.schema()); + let reader2 = RecordBatchIterator::new(vec![Ok(data.slice(5_000, 5_000))], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }; + let mut dataset = Dataset::write(reader1, test_uri, Some(write_params.clone())) + .await + .unwrap(); + dataset.append(reader2, Some(write_params)).await.unwrap(); + + let options = CompactionOptions { + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + let frags: Vec = dataset + .get_fragments() + .into_iter() + .map(Into::into) + .collect(); + assert!(can_use_binary_copy(&dataset, &options, &frags).await); +} + +#[tokio::test] +async fn test_can_use_binary_copy_schema_mismatch() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let data = sample_data(); + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }; + let dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + + let options = CompactionOptions { + enable_binary_copy: true, + ..Default::default() + }; + let mut frags: Vec = dataset + .get_fragments() + .into_iter() + .map(Into::into) + .collect(); + // Introduce a column index mismatch in the first data file + if let Some(df) = frags.get_mut(0).and_then(|f| f.files.get_mut(0)) { + if let Some(first) = df.column_indices.get_mut(0) { + *first = -*first - 1; + } else { + df.column_indices.push(-1); + } + } + assert!(!can_use_binary_copy(&dataset, &options, &frags).await); + + // Also introduce a version mismatch and ensure rejection + if let Some(df) = frags.get_mut(0).and_then(|f| f.files.get_mut(0)) { + df.file_minor_version = if df.file_minor_version == 1 { 2 } else { 1 }; + } + assert!(!can_use_binary_copy(&dataset, &options, &frags).await); +} + +#[tokio::test] +async fn test_can_use_binary_copy_version_mismatch() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let data = sample_data(); + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 500, + data_storage_version: Some(LanceFileVersion::V2_0), + ..Default::default() + }; + let mut dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + + // Append additional data and then mark its files as a newer format version (v2.1). + let reader_append = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + dataset.append(reader_append, None).await.unwrap(); + + let options = CompactionOptions { + enable_binary_copy: true, + ..Default::default() + }; + let mut frags: Vec = dataset + .get_fragments() + .into_iter() + .map(Into::into) + .collect(); + assert!( + frags.len() >= 2, + "expected multiple fragments for version mismatch test" + ); + + // Simulate mixed file versions by marking the second fragment as v2.1. + let (v21_major, v21_minor) = LanceFileVersion::V2_1.to_numbers(); + for file in &mut frags[1].files { + file.file_major_version = v21_major; + file.file_minor_version = v21_minor; + } + + assert!(!can_use_binary_copy(&dataset, &options, &frags).await); +} + +#[tokio::test] +async fn test_can_use_binary_copy_reject_deletions() { + let test_dir = TempStrDir::default(); + let test_uri = &test_dir; + let data = sample_data(); + let reader = RecordBatchIterator::new(vec![Ok(data.clone())], data.schema()); + let write_params = WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, test_uri, Some(write_params)) + .await + .unwrap(); + dataset.delete("a < 10").await.unwrap(); + + let options = CompactionOptions { + enable_binary_copy: true, + ..Default::default() + }; + let frags: Vec = dataset + .get_fragments() + .into_iter() + .map(Into::into) + .collect(); + assert!(!can_use_binary_copy(&dataset, &options, &frags).await); +} + +#[tokio::test] +async fn test_binary_copy_compaction_with_complex_schema() { + for version in LanceFileVersion::iter_non_legacy() { + do_test_binary_copy_compaction_with_complex_schema(version).await; + } +} + +async fn do_test_binary_copy_compaction_with_complex_schema(version: LanceFileVersion) { + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + use lance_core::utils::tempfile::TempStrDir; + use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount}; + + let row_num = 1_000; + + let inner_fields = Fields::from(vec![ + Field::new("x", DataType::UInt32, true), + Field::new("y", DataType::LargeUtf8, true), + ]); + let nested_fields = Fields::from(vec![ + Field::new("inner", DataType::Struct(inner_fields.clone()), true), + Field::new("fsb", DataType::FixedSizeBinary(16), true), + Field::new("bin", DataType::Binary, true), + ]); + let event_fields = Fields::from(vec![ + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new("payload", DataType::Binary, true), + ]); + + let reader_full = gen_batch() + .col("vec1", array::rand_vec::(Dimension::from(12))) + .col("vec2", array::rand_vec::(Dimension::from(8))) + .col("i32", array::step::()) + .col("i64", array::step::()) + .col("f32", array::rand::()) + .col("f64", array::rand::()) + .col("bool", array::cycle_bool(vec![false, true])) + .col("date32", array::rand_date32()) + .col("date64", array::rand_date64()) + .col( + "ts_ms", + array::rand_timestamp(&DataType::Timestamp(TimeUnit::Millisecond, None)), + ) + .col( + "utf8", + array::rand_utf8(lance_datagen::ByteCount::from(16), false), + ) + .col("large_utf8", array::random_sentence(1, 6, true)) + .col( + "bin", + array::rand_fixedbin(lance_datagen::ByteCount::from(24), false), + ) + .col( + "large_bin", + array::rand_fixedbin(lance_datagen::ByteCount::from(24), true), + ) + .col( + "varbin", + array::rand_varbin( + lance_datagen::ByteCount::from(8), + lance_datagen::ByteCount::from(32), + ), + ) + .col("fsb16", array::rand_fsb(16)) + .col( + "fsl4", + array::cycle_vec(array::rand::(), Dimension::from(4)), + ) + .col("struct_simple", array::rand_struct(inner_fields.clone())) + .col("struct_nested", array::rand_struct(nested_fields)) + .col( + "events", + array::rand_list_any(array::rand_struct(event_fields.clone()), true), + ) + .into_reader_rows(RowCount::from(row_num), BatchCount::from(10)); + + let full_dir = TempStrDir::default(); + let mut dataset = Dataset::write( + reader_full, + &*full_dir, + Some(WriteParams { + enable_stable_row_ids: true, + data_storage_version: Some(version), + max_rows_per_file: (row_num / 100) as usize, + ..Default::default() + }), + ) + .await + .unwrap(); + + let opt_full = CompactionOptions { + enable_binary_copy: false, + ..Default::default() + }; + let opt_binary = CompactionOptions { + enable_binary_copy: true, + enable_binary_copy_force: true, + ..Default::default() + }; + + let _ = compact_files(&mut dataset, opt_full, None).await.unwrap(); + let before = dataset.count_rows(None).await.unwrap(); + let batch_before = dataset.scan().try_into_batch().await.unwrap(); + + let mut dataset = dataset.checkout_version(1).await.unwrap(); + + // rollback and trigger another binary copy compaction + dataset.restore().await.unwrap(); + let _ = compact_files(&mut dataset, opt_binary, None).await.unwrap(); + let after = dataset.count_rows(None).await.unwrap(); + let batch_after = dataset.scan().try_into_batch().await.unwrap(); + + assert_eq!(before, after); + assert_eq!(batch_before, batch_after); +}