diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 585596e513e..8b815078633 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1064,9 +1064,10 @@ impl Iterator for SpillStreamIter { mod tests { use super::*; - use arrow_array::{Int32Array, RecordBatchReader, StructArray}; + use arrow_array::{Int32Array, RecordBatchIterator, RecordBatchReader, StructArray}; use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema}; use datafusion::{error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter}; + use datafusion_physical_plan::RecordBatchStream; use futures::TryStreamExt; use lance_datagen::{array, gen_batch, BatchCount, RowCount}; use lance_file::previous::reader::FileReader as PreviousFileReader; @@ -1206,6 +1207,163 @@ mod tests { assert_eq!(fragments.len(), 2); } + #[tokio::test] + async fn test_max_rows_per_file() { + let reader_to_frags = |data_reader: Box| { + let schema = data_reader.schema(); + let data_reader = + data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from)); + + let data_stream = Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(data_reader), + )); + + let write_params = WriteParams { + max_rows_per_file: 5000, // Limit by rows + max_bytes_per_file: 1024 * 1024 * 1024, // Won't be limited by this + mode: WriteMode::Create, + ..Default::default() + }; + + async move { + let schema = Schema::try_from(schema.as_ref()).unwrap(); + + let object_store = Arc::new(ObjectStore::memory()); + write_fragments_internal( + None, + object_store, + &Path::from("test"), + schema, + data_stream, + write_params, + None, + ) + .await + } + }; + + // Generate 12000 rows total, which should create 3 files: + // - File 1: 5000 rows + // - File 2: 5000 rows + // - File 3: 2000 rows + let data_reader = Box::new( + gen_batch() + .anon_col(array::rand_type(&DataType::Int32)) + .into_reader_rows(RowCount::from(12000), BatchCount::from(1)), + ); + + let (fragments, _) = reader_to_frags(data_reader).await.unwrap(); + + // Should have 3 fragments + assert_eq!(fragments.len(), 3); + + // Verify the row count distribution + let row_counts: Vec = fragments + .iter() + .map(|f| f.physical_rows.unwrap_or(0)) + .collect(); + assert_eq!(row_counts, vec![5000, 5000, 2000]); + } + + #[tokio::test] + async fn test_max_rows_per_group() { + let reader_to_frags = |data_reader: Box, + version: LanceFileVersion| { + let schema = data_reader.schema(); + let data_reader = + data_reader.map(|rb| rb.map_err(datafusion::error::DataFusionError::from)); + + let data_stream = Box::pin(RecordBatchStreamAdapter::new( + schema.clone(), + futures::stream::iter(data_reader), + )); + + let write_params = WriteParams { + max_rows_per_file: 5000, // Smaller than total data to force multiple files + max_rows_per_group: 3000, // Row group size affects V1 only + mode: WriteMode::Create, + data_storage_version: Some(version), + ..Default::default() + }; + + async move { + let schema = Schema::try_from(schema.as_ref()).unwrap(); + + let object_store = Arc::new(ObjectStore::memory()); + write_fragments_internal( + None, + object_store, + &Path::from("test"), + schema, + data_stream, + write_params, + None, + ) + .await + } + }; + + // Test V1 (Legacy) version: max_rows_per_group affects chunking + // With max_rows_per_group=3000 and max_rows_per_file=5000: + // - Stream is chunked into batches of max 3000 rows + // - Batches are written to files, splitting when file exceeds 5000 rows + // For 9000 rows: + // - Chunk 1 (3000 rows) -> File 1 (6000 rows) - exceeds limit, triggers new file + // - Chunk 2 (3000 rows) -> File 2 (3000 rows) - start of new file + // Result: 2 fragments with [6000, 3000] rows + // Note: The exact behavior depends on when file splitting occurs + let data_reader_v1 = Box::new( + gen_batch() + .anon_col(array::rand_type(&DataType::Int32)) + .into_reader_rows(RowCount::from(9000), BatchCount::from(1)), + ); + + let (fragments_v1, _) = reader_to_frags(data_reader_v1, LanceFileVersion::Legacy) + .await + .unwrap(); + let row_counts_v1: Vec = fragments_v1 + .iter() + .map(|f| f.physical_rows.unwrap_or(0)) + .collect(); + + // V1 creates 2 fragments based on row group chunking and file size limit + assert_eq!(fragments_v1.len(), 2); + assert_eq!(row_counts_v1, vec![6000, 3000]); + + // Test V2+ version: max_rows_per_group is ignored, only max_rows_per_file matters + // With max_rows_per_file=5000 and 9000 rows: + // - Stream is not chunked by row group size + // - Data is split only at file boundaries (5000 rows per file) + // Result: 2 fragments with [5000, 4000] rows + // V2 splits data more evenly at file boundaries regardless of row group size + let data_reader_v2 = Box::new( + gen_batch() + .anon_col(array::rand_type(&DataType::Int32)) + .into_reader_rows(RowCount::from(9000), BatchCount::from(1)), + ); + + let (fragments_v2, _) = reader_to_frags(data_reader_v2, LanceFileVersion::Stable) + .await + .unwrap(); + let row_counts_v2: Vec = fragments_v2 + .iter() + .map(|f| f.physical_rows.unwrap_or(0)) + .collect(); + + // V2 should create 2 fragments based on file size only + assert_eq!(fragments_v2.len(), 2); + assert_eq!(row_counts_v2, vec![5000, 4000]); + + // Key difference: Both V1 and V2 create 2 fragments, but with different distributions + // - V1: [6000, 3000] - chunking by row groups affects distribution + // - V2: [5000, 4000] - split only at file boundaries, more even + // V2 distribution should be more even (closer to 5000/5000 split) + // V1 distribution is affected by row group chunking (3000) + assert_eq!(fragments_v1.len(), fragments_v2.len()); + assert_ne!(row_counts_v1, row_counts_v2); + } + #[tokio::test] async fn test_file_write_version() { let schema = Arc::new(ArrowSchema::new(vec![arrow::datatypes::Field::new( @@ -2180,4 +2338,425 @@ mod tests { .collect(); assert_eq!(base2_fragments.len(), 1, "Should have 1 fragment in base2"); } + + #[tokio::test] + async fn test_empty_stream_write() { + use lance_io::object_store::ObjectStore; + + // Test writing an empty stream + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + // Create an empty stream + let data_stream = Box::pin(RecordBatchStreamAdapter::new( + arrow_schema.clone(), + futures::stream::iter(std::iter::empty::< + std::result::Result, + >()), + )); + + let object_store = Arc::new(ObjectStore::memory()); + let write_params = WriteParams { + mode: WriteMode::Create, + ..Default::default() + }; + + let result = write_fragments_internal( + None, + object_store, + &Path::from("test_empty"), + schema, + data_stream, + write_params, + None, + ) + .await; + + // Empty stream should be handled gracefully + // It should create an empty dataset or return an appropriate result + match result { + Ok((fragments, _)) => { + // If successful, verify it creates an empty result + assert!( + fragments.is_empty(), + "Empty stream should create no fragments" + ); + } + Err(e) => { + panic!("Expected write empty stream success, got error: {}", e); + } + } + } + + #[tokio::test] + async fn test_schema_mismatch_on_append() { + use arrow_array::record_batch; + + // Create initial dataset with two Int32 columns + let batch1 = record_batch!( + ("id", Int32, [1, 2, 3, 4, 5]), + ("value", Int32, [10, 20, 30, 40, 50]) + ) + .unwrap(); + + let dataset = InsertBuilder::new("memory://") + .with_params(&WriteParams { + mode: WriteMode::Create, + ..Default::default() + }) + .execute(vec![batch1]) + .await + .unwrap(); + + // Verify initial dataset + assert_eq!(dataset.count_rows(None).await.unwrap(), 5); + assert_eq!(dataset.schema().fields.len(), 2); + + // Try to append with different schema (Float64 instead of Int32 for 'value' column) + let batch2 = record_batch!( + ("id", Int32, [6, 7, 8]), + ("value", Float64, [60.0, 70.0, 80.0]) + ) + .unwrap(); + + let result = InsertBuilder::new(Arc::new(dataset.clone())) + .with_params(&WriteParams { + mode: WriteMode::Append, + ..Default::default() + }) + .execute(vec![batch2]) + .await; + + // Should fail due to schema mismatch + assert!(result.is_err(), "Append with mismatched schema should fail"); + let error = result.unwrap_err(); + let error_msg = error.to_string().to_lowercase(); + assert!( + error_msg.contains("schema") + || error_msg.contains("type") + || error_msg.contains("mismatch") + || error_msg.contains("field") + || error_msg.contains("not found"), + "Error should mention schema or type mismatch: {}", + error_msg + ); + + // Verify original dataset is still intact + assert_eq!(dataset.count_rows(None).await.unwrap(), 5); + assert_eq!(dataset.schema().fields.len(), 2); + } + + #[tokio::test] + async fn test_disk_full_error() { + use std::io::{self, ErrorKind}; + use std::sync::Arc; + + use async_trait::async_trait; + use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOptions, + PutOptions, PutPayload, PutResult, + }; + + // Create a custom ObjectStore that simulates disk full error + #[derive(Debug)] + struct DiskFullObjectStore; + + impl std::fmt::Display for DiskFullObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DiskFullObjectStore") + } + } + + #[async_trait] + impl object_store::ObjectStore for DiskFullObjectStore { + async fn put( + &self, + _location: &object_store::path::Path, + _bytes: PutPayload, + ) -> object_store::Result { + Err(object_store::Error::Generic { + store: "DiskFullStore", + source: Box::new(io::Error::new( + ErrorKind::StorageFull, + "No space left on device", + )), + }) + } + + async fn put_opts( + &self, + _location: &object_store::path::Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> object_store::Result { + Err(object_store::Error::Generic { + store: "DiskFullStore", + source: Box::new(io::Error::new( + ErrorKind::StorageFull, + "No space left on device", + )), + }) + } + + async fn put_multipart( + &self, + _location: &object_store::path::Path, + ) -> object_store::Result> { + Err(object_store::Error::NotSupported { + source: "Multipart upload not supported".into(), + }) + } + + async fn put_multipart_opts( + &self, + _location: &object_store::path::Path, + _opts: PutMultipartOptions, + ) -> object_store::Result> { + Err(object_store::Error::NotSupported { + source: "Multipart upload not supported".into(), + }) + } + + async fn get( + &self, + _location: &object_store::path::Path, + ) -> object_store::Result { + Err(object_store::Error::NotFound { + path: "".into(), + source: "".into(), + }) + } + + async fn get_opts( + &self, + _location: &object_store::path::Path, + _options: GetOptions, + ) -> object_store::Result { + Err(object_store::Error::NotFound { + path: "".into(), + source: "".into(), + }) + } + + async fn delete( + &self, + _location: &object_store::path::Path, + ) -> object_store::Result<()> { + Ok(()) + } + + fn list( + &self, + _prefix: Option<&object_store::path::Path>, + ) -> futures::stream::BoxStream<'static, object_store::Result> { + Box::pin(futures::stream::empty()) + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&object_store::path::Path>, + ) -> object_store::Result { + Ok(ListResult { + common_prefixes: vec![], + objects: vec![], + }) + } + + async fn copy( + &self, + _from: &object_store::path::Path, + _to: &object_store::path::Path, + ) -> object_store::Result<()> { + Ok(()) + } + + async fn copy_if_not_exists( + &self, + _from: &object_store::path::Path, + _to: &object_store::path::Path, + ) -> object_store::Result<()> { + Ok(()) + } + } + + let object_store = Arc::new(lance_io::object_store::ObjectStore::new( + Arc::new(DiskFullObjectStore) as Arc, + url::Url::parse("file:///test").unwrap(), + None, + None, + false, + true, + lance_io::object_store::DEFAULT_LOCAL_IO_PARALLELISM, + lance_io::object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, + None, + )); + + // Create test data + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))], + ) + .unwrap(); + + let data_reader = Box::new(RecordBatchIterator::new( + vec![Ok(batch)].into_iter(), + arrow_schema.clone(), + )); + + let data_stream = Box::pin(RecordBatchStreamAdapter::new( + arrow_schema, + futures::stream::iter(data_reader.map(|rb| rb.map_err(DataFusionError::from))), + )); + + let schema = Schema::try_from(data_stream.schema().as_ref()).unwrap(); + + let write_params = WriteParams { + mode: WriteMode::Create, + ..Default::default() + }; + + // Attempt to write data - should fail with IO error due to disk full + let result = write_fragments_internal( + None, + object_store, + &Path::from("test_disk_full"), + schema, + data_stream, + write_params, + None, + ) + .await; + + // Verify that the error is an IO error (which wraps the disk full error) + assert!(result.is_err(), "Write should fail when disk is full"); + let error = result.unwrap_err(); + let error_msg = error.to_string().to_lowercase(); + + // The error should mention IO, space, or storage + assert!( + error_msg.contains("io") + || error_msg.contains("space") + || error_msg.contains("storage") + || error_msg.contains("full"), + "Error should mention IO, space, or storage: {}", + error_msg + ); + + // Verify it's an IO error type + assert!( + matches!(error, lance_core::Error::IO { .. }), + "Expected IO error, got: {}", + error + ); + } + + /// Test that dataset remains consistent after write interruption and can recover. + /// This verifies that: + /// 1. The dataset is not corrupted when a write is interrupted (not committed) + /// 2. Incomplete data files are not visible until committed + /// 3. The transaction can be retried successfully + #[tokio::test] + async fn test_write_interruption_recovery() { + use super::commit::CommitBuilder; + use arrow_array::record_batch; + + // Create a temporary directory for testing + let temp_dir = TempDir::default(); + let dataset_uri = format!("file://{}", temp_dir.std_path().display()); + + // First, create a normal dataset with some initial data + let batch = + record_batch!(("id", Int32, [1, 2, 3]), ("value", Utf8, ["a", "b", "c"])).unwrap(); + + // Write initial dataset normally + let dataset = InsertBuilder::new(&dataset_uri) + .execute(vec![batch.clone()]) + .await + .unwrap(); + + // Verify initial dataset is valid + assert_eq!(dataset.count_rows(None).await.unwrap(), 3); + + // Prepare additional data to write + let new_batch = + record_batch!(("id", Int32, [4, 5, 6]), ("value", Utf8, ["d", "e", "f"])).unwrap(); + + // Step 1: Write uncommitted data (simulates interrupted write before commit) + let uncommitted_result = InsertBuilder::new(WriteDestination::Dataset(Arc::new( + Dataset::open(&dataset_uri).await.unwrap(), + ))) + .with_params(&WriteParams { + mode: WriteMode::Append, + ..Default::default() + }) + .execute_uncommitted(vec![new_batch]) + .await; + + // The uncommitted write should succeed (data is written to files) + assert!( + uncommitted_result.is_ok(), + "Uncommitted write should succeed" + ); + let transaction = uncommitted_result.unwrap(); + + // Step 2: Verify dataset is still consistent (uncommitted changes not visible) + let dataset_before_commit = Dataset::open(&dataset_uri).await.unwrap(); + let row_count_before = dataset_before_commit.count_rows(None).await.unwrap(); + assert_eq!( + row_count_before, 3, + "Dataset should still have only original 3 rows (uncommitted data not visible)" + ); + + // Step 3: Commit to transaction (simulates retry after interruption) + let commit_result = CommitBuilder::new(&dataset_uri).execute(transaction).await; + commit_result.unwrap(); + + // Step 4: Verify dataset now has all 6 rows after successful commit + let dataset_after_commit = Dataset::open(&dataset_uri).await.unwrap(); + let row_count_after = dataset_after_commit.count_rows(None).await.unwrap(); + assert_eq!( + row_count_after, 6, + "Dataset should have all 6 rows after commit" + ); + + // Verify data integrity + let mut scanner = dataset_after_commit.scan(); + scanner.project(&["id", "value"]).unwrap(); + let batches = scanner + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let all_ids: Vec = batches + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .flatten() + }) + .collect(); + + assert_eq!( + all_ids, + vec![1, 2, 3, 4, 5, 6], + "All data should be correctly written" + ); + } }