From 05f383c7ab887b6e80a1c5bde663c9e7f35cf670 Mon Sep 17 00:00:00 2001 From: fenfeng9 Date: Sat, 18 Oct 2025 20:24:33 +0800 Subject: [PATCH 1/4] fix(merge_insert): Allow permissive nullability for source schema This aligns merge_insert behavior with insert by ignoring nullability differences when no actual nulls are present in the source data. Fixes #4518 --- rust/lance/src/dataset/write/merge_insert.rs | 114 +++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 28ed91f77b3..d535d0f14f9 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -105,6 +105,7 @@ use std::{ time::Duration, }; use tokio::task::JoinSet; +use lance_core::datatypes::NullabilityComparison; mod assign_action; mod exec; @@ -534,6 +535,7 @@ impl MergeInsertJob { self.dataset.schema(), &SchemaCompareOptions { compare_dictionary: self.dataset.is_legacy_storage(), + compare_nullability: NullabilityComparison::Ignore, ..Default::default() }, ); @@ -1397,6 +1399,8 @@ impl MergeInsertJob { &lance_schema, &SchemaCompareOptions { compare_metadata: false, + // Allow nullable source fields for non-nullable targets. + compare_nullability: NullabilityComparison::Ignore, ..Default::default() }, ); @@ -1437,6 +1441,8 @@ impl MergeInsertJob { &lance_schema, &SchemaCompareOptions { compare_metadata: false, + // Allow nullable source fields for non-nullable targets. + compare_nullability: NullabilityComparison::Ignore, ..Default::default() }, ); @@ -4934,4 +4940,112 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n .unwrap(); assert_eq!(count, 3); } + + /// Test case for Issue #4654: merge_insert should handle nullable source fields + /// when target is non-nullable, as long as there are no actual null values. + /// + /// This test verifies that: + /// - Dataset has non-nullable fields + /// - Source data has nullable fields BUT no actual null values + /// - merge_insert() succeeds (same behavior as insert) + #[tokio::test] + async fn test_merge_insert_permissive_nullability() { + // Step 1: Create dataset with NON-NULLABLE schema + let non_nullable_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), // nullable=False + Field::new("value", DataType::Int64, false), // nullable=False + ])); + + let initial_data = RecordBatch::try_new( + non_nullable_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![100, 200, 300])), + ], + ) + .unwrap(); + + let test_uri = "memory://test_nullable_issue_4654"; + let dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial_data)], non_nullable_schema.clone()), + test_uri, + None, + ) + .await + .unwrap(); + + // Step 2: Create new data with NULLABLE schema but NO actual null values + let nullable_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), // nullable=True + Field::new("value", DataType::Int64, true), // nullable=True + ])); + + let new_data = RecordBatch::try_new( + nullable_schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![2, 4, 5])), // id=2 exists (update), 4,5 new (insert) + Arc::new(Int64Array::from(vec![999, 400, 500])), // No nulls + ], + ) + .unwrap(); + + // Step 3: Test merge_insert() + let merge_result = MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(new_data.clone())], + nullable_schema.clone(), + ))) + .await; + + assert!( + merge_result.is_ok(), + "merge_insert() should succeed with nullable fields but no actual nulls. \ + This is the same behavior as insert/append. Error: {:?}", + merge_result.err() + ); + + // Step 4: Verify the results + let (merged_dataset, stats) = merge_result.unwrap(); + + // Should have: 1 updated row (id=2), 2 new rows (id=4,5) + assert_eq!(stats.num_updated_rows, 1, "Should update 1 row (id=2)"); + assert_eq!( + stats.num_inserted_rows, 2, + "Should insert 2 new rows (id=4,5)" + ); + + // Total: 3 original (id=1,2,3) + 2 new (id=4,5) = 5 rows + let count = merged_dataset.count_rows(None).await.unwrap(); + assert_eq!(count, 5, "Should have 5 total rows"); + + // Verify the updated value for id=2 + let result = merged_dataset + .scan() + .filter("id = 2") + .unwrap() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let batch = concat_batches(&result[0].schema(), &result).unwrap(); + assert_eq!(batch.num_rows(), 1); + let value_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + value_array.value(0), + 999, + "Value for id=2 should be updated to 999" + ); + } } From 032238c3928040b3d3eb14217c29fd45f24249e4 Mon Sep 17 00:00:00 2001 From: fenfeng9 Date: Sat, 18 Oct 2025 20:54:41 +0800 Subject: [PATCH 2/4] refactor(merge_insert): Use core allow_subschema for schema validation Replaces the local is_subschema implementation with the allow_subschema option from lance-core. This simplifies the validation logic and improves error reporting for invalid subschemas. Fixes #3634 --- rust/lance-core/src/datatypes/field.rs | 2 + rust/lance-core/src/datatypes/schema.rs | 10 +- rust/lance/src/dataset/write/merge_insert.rs | 147 ++++++++++++++----- 3 files changed, 118 insertions(+), 41 deletions(-) diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 3d1463a02f1..4ca905babde 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -71,6 +71,8 @@ pub struct SchemaCompareOptions { pub allow_missing_if_nullable: bool, /// Allow out of order fields (default false) pub ignore_field_order: bool, + /// Allow the source schema to be a subset of the target schema (default false) + pub allow_subschema: bool, } /// Encoding enum. #[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)] diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 0fbd8df69fd..5defe6c36cf 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -778,7 +778,7 @@ pub fn compare_fields( expected: &[Field], options: &SchemaCompareOptions, ) -> bool { - if options.allow_missing_if_nullable || options.ignore_field_order { + if options.allow_missing_if_nullable || options.ignore_field_order || options.allow_subschema { let expected_names = expected .iter() .map(|f| f.name.as_str()) @@ -805,6 +805,9 @@ pub fn compare_fields( return false; } cumulative_position = *pos; + } else if options.allow_subschema { + // allow_subschema: allow missing any field + continue; } else if options.allow_missing_if_nullable && expected_field.nullable { continue; } else { @@ -852,7 +855,10 @@ pub fn explain_fields_difference( .map(prepend_path) .collect::>(); let missing_fields = expected_names.difference(&field_names); - let missing_fields = if options.allow_missing_if_nullable { + let missing_fields = if options.allow_subschema { + // allow_subschema: don't report any missing fields + Vec::new() + } else if options.allow_missing_if_nullable { missing_fields .filter(|f| { let expected_field = expected.iter().find(|ef| ef.name == **f).unwrap(); diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index d535d0f14f9..bd09335df6d 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -71,6 +71,7 @@ use futures::{ Stream, StreamExt, TryStreamExt, }; use lance_arrow::{interleave_batches, RecordBatchExt, SchemaExt}; +use lance_core::datatypes::NullabilityComparison; use lance_core::utils::address::RowAddress; use lance_core::{ datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions}, @@ -105,7 +106,6 @@ use std::{ time::Duration, }; use tokio::task::JoinSet; -use lance_core::datatypes::NullabilityComparison; mod assign_action; mod exec; @@ -531,40 +531,29 @@ impl MergeInsertJob { fn check_compatible_schema(&self, schema: &Schema) -> Result { let lance_schema: lance_core::datatypes::Schema = schema.try_into()?; - let is_compatible = lance_schema.check_compatible( - self.dataset.schema(), - &SchemaCompareOptions { - compare_dictionary: self.dataset.is_legacy_storage(), - compare_nullability: NullabilityComparison::Ignore, - ..Default::default() - }, - ); + let target_schema = self.dataset.schema(); - fn is_subschema(schema: &Schema, candidate: &Schema) -> bool { - // Schema::contains() cares about order, but we don't. - for field in candidate.fields() { - if !schema - .field_with_name(field.name()) - .map(|f| f.contains(field)) - .unwrap_or(false) - { - return false; - } - } - true - } + let mut options = SchemaCompareOptions { + compare_dictionary: self.dataset.is_legacy_storage(), + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }; - if let Err(e) = is_compatible { - // It might be a subschema - let dataset_arrow_schema = Schema::from(self.dataset.schema()); - if is_subschema(&dataset_arrow_schema, schema) { - Ok(SchemaComparison::Subschema) - } else { - Err(e) - } - } else { - Ok(SchemaComparison::FullCompatible) + // Try full schema match first. + if lance_schema + .check_compatible(&target_schema, &options) + .is_ok() + { + return Ok(SchemaComparison::FullCompatible); } + + // If full match fails, try subschema match. + options.allow_subschema = true; + options.ignore_field_order = true; // Subschema matching should typically ignore order. + + lance_schema + .check_compatible(&target_schema, &options) + .map(|_| SchemaComparison::Subschema) } async fn join_key_as_scalar_index(&self) -> Result> { @@ -2118,8 +2107,8 @@ mod tests { use arrow_array::types::Float32Type; use arrow_array::{ types::{Int32Type, UInt32Type}, - FixedSizeListArray, Float32Array, Int32Array, Int64Array, RecordBatchIterator, - RecordBatchReader, StringArray, UInt32Array, + FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array, + RecordBatchIterator, RecordBatchReader, StringArray, UInt32Array, }; use arrow_select::concat::concat_batches; use datafusion::common::Column; @@ -4963,7 +4952,7 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n Arc::new(Int64Array::from(vec![100, 200, 300])), ], ) - .unwrap(); + .unwrap(); let test_uri = "memory://test_nullable_issue_4654"; let dataset = Dataset::write( @@ -4971,12 +4960,12 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n test_uri, None, ) - .await - .unwrap(); + .await + .unwrap(); // Step 2: Create new data with NULLABLE schema but NO actual null values let nullable_schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::Int64, true), // nullable=True + Field::new("id", DataType::Int64, true), // nullable=True Field::new("value", DataType::Int64, true), // nullable=True ])); @@ -4987,7 +4976,7 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n Arc::new(Int64Array::from(vec![999, 400, 500])), // No nulls ], ) - .unwrap(); + .unwrap(); // Step 3: Test merge_insert() let merge_result = MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()]) @@ -5048,4 +5037,84 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n "Value for id=2 should be updated to 999" ); } + + /// Test case for Issue #3634: merge_insert should provide a helpful error + /// message when a subschema with a mismatched type is provided. + #[tokio::test] + async fn test_merge_insert_subschema_invalid_type_error() { + // Step 1: Create a dataset with a multi-column schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float64, true), // The target type is Float64. + Field::new("extra", DataType::Utf8, true), + ])); + + let initial_data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + + let test_uri = "memory://test_issue_3634"; + let dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(initial_data)], schema), + test_uri, + None, + ) + .await + .unwrap(); + + // Step 2: Create source data with a subschema where one field has a wrong type. + let subschema_with_wrong_type = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Int32, true), + ])); + + let new_data = RecordBatch::try_new( + subschema_with_wrong_type.clone(), + vec![ + Arc::new(Int32Array::from(vec![2, 4])), + Arc::new(Int32Array::from(vec![22, 44])), + ], + ) + .unwrap(); + + // Step 3: Execute the merge_insert operation, which should fail. + let merge_result = MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()]) + .unwrap() + .when_matched(WhenMatched::UpdateAll) + .when_not_matched(WhenNotMatched::InsertAll) + .try_build() + .unwrap() + .execute_reader(Box::new(RecordBatchIterator::new( + vec![Ok(new_data)], + subschema_with_wrong_type, + ))) + .await; + + // Step 4: Verify that the operation failed with the correct error type and message. + let err = merge_result.expect_err("Merge insert should have failed but it succeeded."); + assert!( + matches!(err, lance_core::Error::SchemaMismatch { .. }), + "Expected a SchemaMismatch error, but got a different error type: {:?}", + err + ); + + let error_message = err.to_string(); + assert!( + error_message.contains("`value` should have type double but type was int32"), + "Error message should specify the expected (double) and actual (int32) types for 'value', but was: {}", + error_message + ); + + assert!( + !error_message.contains("missing="), + "Error message should NOT complain about missing fields for a subschema check, but was: {}", + error_message + ); + } } From 8760cef3a92a3cf899c7af64f94bcd2ccd315aea Mon Sep 17 00:00:00 2001 From: fenfeng9 Date: Sat, 18 Oct 2025 22:42:22 +0800 Subject: [PATCH 3/4] test(python): Add test for permissive nullability in merge_insert --- python/python/tests/test_dataset.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 3f29ad40103..9bb54b2525f 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2456,6 +2456,56 @@ def test_add_null_columns(tmp_path: Path): ) +def test_merge_insert_permissive_nullability(tmp_path): + """ + Reported in https://github.com/lancedb/lance/issues/4518 + Tests that merge_insert works when the source schema is nullable + but the target is not, as long as no nulls are present. + """ + target_schema = pa.schema( + [ + pa.field("id", pa.int64(), nullable=False), + pa.field("value", pa.int64(), nullable=False), + ] + ) + initial_data = pa.table( + {"id": [1, 2, 3], "value": [10, 20, 30]}, schema=target_schema + ) + + uri = tmp_path / "dataset" + ds = lance.write_dataset(initial_data, uri) + + source_schema = pa.schema( + [ + pa.field("id", pa.int64(), nullable=True), + pa.field("value", pa.int64(), nullable=True), + ] + ) + + new_data = pa.table( + {"id": [2, 4, 5], "value": [200, 400, 500]}, schema=source_schema + ) + + # Execute merge_insert, which should now succeed. + stats = ( + ds.merge_insert("id") + .when_matched_update_all() + .when_not_matched_insert_all() + .execute(new_data) + ) + + # Verify the results. + assert stats["num_updated_rows"] == 1 + assert stats["num_inserted_rows"] == 2 + + expected_data = pa.table( + {"id": [1, 2, 3, 4, 5], "value": [10, 200, 30, 400, 500]}, schema=target_schema + ) + + result_table = ds.to_table() + assert result_table.sort_by("id").equals(expected_data.sort_by("id")) + + def test_add_null_columns_with_conflict_names(tmp_path: Path): data = pa.table({"id": [1, 2, 4]}) ds = lance.write_dataset(data, tmp_path) From 0f53ed2bd14c6496967e87f153e9d1b3475c3a3d Mon Sep 17 00:00:00 2001 From: fenfeng9 Date: Sun, 19 Oct 2025 19:40:24 +0800 Subject: [PATCH 4/4] fix: remove needless borrow --- rust/lance/src/dataset/write/merge_insert.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index bd09335df6d..75d4496cec0 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -541,7 +541,7 @@ impl MergeInsertJob { // Try full schema match first. if lance_schema - .check_compatible(&target_schema, &options) + .check_compatible(target_schema, &options) .is_ok() { return Ok(SchemaComparison::FullCompatible); @@ -552,7 +552,7 @@ impl MergeInsertJob { options.ignore_field_order = true; // Subschema matching should typically ignore order. lance_schema - .check_compatible(&target_schema, &options) + .check_compatible(target_schema, &options) .map(|_| SchemaComparison::Subschema) }