Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,56 @@ def test_add_null_columns(tmp_path: Path):
)


def test_merge_insert_permissive_nullability(tmp_path):
"""
Reported in https://github.com/lancedb/lance/issues/4518
Tests that merge_insert works when the source schema is nullable
but the target is not, as long as no nulls are present.
"""
target_schema = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("value", pa.int64(), nullable=False),
]
)
initial_data = pa.table(
{"id": [1, 2, 3], "value": [10, 20, 30]}, schema=target_schema
)

uri = tmp_path / "dataset"
ds = lance.write_dataset(initial_data, uri)

source_schema = pa.schema(
[
pa.field("id", pa.int64(), nullable=True),
pa.field("value", pa.int64(), nullable=True),
]
)

new_data = pa.table(
{"id": [2, 4, 5], "value": [200, 400, 500]}, schema=source_schema
)

# Execute merge_insert, which should now succeed.
stats = (
ds.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_data)
)

# Verify the results.
assert stats["num_updated_rows"] == 1
assert stats["num_inserted_rows"] == 2

expected_data = pa.table(
{"id": [1, 2, 3, 4, 5], "value": [10, 200, 30, 400, 500]}, schema=target_schema
)

result_table = ds.to_table()
assert result_table.sort_by("id").equals(expected_data.sort_by("id"))


def test_add_null_columns_with_conflict_names(tmp_path: Path):
data = pa.table({"id": [1, 2, 4]})
ds = lance.write_dataset(data, tmp_path)
Expand Down
2 changes: 2 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct SchemaCompareOptions {
pub allow_missing_if_nullable: bool,
/// Allow out of order fields (default false)
pub ignore_field_order: bool,
/// Allow the source schema to be a subset of the target schema (default false)
pub allow_subschema: bool,
}
/// Encoding enum.
#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
Expand Down
10 changes: 8 additions & 2 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ pub fn compare_fields(
expected: &[Field],
options: &SchemaCompareOptions,
) -> bool {
if options.allow_missing_if_nullable || options.ignore_field_order {
if options.allow_missing_if_nullable || options.ignore_field_order || options.allow_subschema {
let expected_names = expected
.iter()
.map(|f| f.name.as_str())
Expand All @@ -797,6 +797,9 @@ pub fn compare_fields(
return false;
}
cumulative_position = *pos;
} else if options.allow_subschema {
// allow_subschema: allow missing any field
continue;
} else if options.allow_missing_if_nullable && expected_field.nullable {
continue;
} else {
Expand Down Expand Up @@ -844,7 +847,10 @@ pub fn explain_fields_difference(
.map(prepend_path)
.collect::<Vec<_>>();
let missing_fields = expected_names.difference(&field_names);
let missing_fields = if options.allow_missing_if_nullable {
let missing_fields = if options.allow_subschema {
// allow_subschema: don't report any missing fields
Vec::new()
} else if options.allow_missing_if_nullable {
missing_fields
.filter(|f| {
let expected_field = expected.iter().find(|ef| ef.name == **f).unwrap();
Expand Down
247 changes: 215 additions & 32 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use futures::{
Stream, StreamExt, TryStreamExt,
};
use lance_arrow::{interleave_batches, RecordBatchExt, SchemaExt};
use lance_core::datatypes::NullabilityComparison;
use lance_core::utils::address::RowAddress;
use lance_core::{
datatypes::{OnMissing, OnTypeMismatch, SchemaCompareOptions},
Expand Down Expand Up @@ -530,39 +531,29 @@ impl MergeInsertJob {

fn check_compatible_schema(&self, schema: &Schema) -> Result<SchemaComparison> {
let lance_schema: lance_core::datatypes::Schema = schema.try_into()?;
let is_compatible = lance_schema.check_compatible(
self.dataset.schema(),
&SchemaCompareOptions {
compare_dictionary: self.dataset.is_legacy_storage(),
..Default::default()
},
);
let target_schema = self.dataset.schema();

fn is_subschema(schema: &Schema, candidate: &Schema) -> bool {
// Schema::contains() cares about order, but we don't.
for field in candidate.fields() {
if !schema
.field_with_name(field.name())
.map(|f| f.contains(field))
.unwrap_or(false)
{
return false;
}
}
true
}
let mut options = SchemaCompareOptions {
compare_dictionary: self.dataset.is_legacy_storage(),
compare_nullability: NullabilityComparison::Ignore,
..Default::default()
};

if let Err(e) = is_compatible {
// It might be a subschema
let dataset_arrow_schema = Schema::from(self.dataset.schema());
if is_subschema(&dataset_arrow_schema, schema) {
Ok(SchemaComparison::Subschema)
} else {
Err(e)
}
} else {
Ok(SchemaComparison::FullCompatible)
// Try full schema match first.
if lance_schema
.check_compatible(target_schema, &options)
.is_ok()
{
return Ok(SchemaComparison::FullCompatible);
}

// If full match fails, try subschema match.
options.allow_subschema = true;
options.ignore_field_order = true; // Subschema matching should typically ignore order.

lance_schema
.check_compatible(target_schema, &options)
.map(|_| SchemaComparison::Subschema)
}

async fn join_key_as_scalar_index(&self) -> Result<Option<IndexMetadata>> {
Expand Down Expand Up @@ -1397,6 +1388,8 @@ impl MergeInsertJob {
&lance_schema,
&SchemaCompareOptions {
compare_metadata: false,
// Allow nullable source fields for non-nullable targets.
compare_nullability: NullabilityComparison::Ignore,
..Default::default()
},
);
Expand Down Expand Up @@ -1437,6 +1430,8 @@ impl MergeInsertJob {
&lance_schema,
&SchemaCompareOptions {
compare_metadata: false,
// Allow nullable source fields for non-nullable targets.
compare_nullability: NullabilityComparison::Ignore,
..Default::default()
},
);
Expand Down Expand Up @@ -2112,8 +2107,8 @@ mod tests {
use arrow_array::types::Float32Type;
use arrow_array::{
types::{Int32Type, UInt32Type},
FixedSizeListArray, Float32Array, Int32Array, Int64Array, RecordBatchIterator,
RecordBatchReader, StringArray, UInt32Array,
FixedSizeListArray, Float32Array, Float64Array, Int32Array, Int64Array,
RecordBatchIterator, RecordBatchReader, StringArray, UInt32Array,
};
use arrow_select::concat::concat_batches;
use datafusion::common::Column;
Expand Down Expand Up @@ -4934,4 +4929,192 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n
.unwrap();
assert_eq!(count, 3);
}

/// Test case for Issue #4654: merge_insert should handle nullable source fields
/// when target is non-nullable, as long as there are no actual null values.
///
/// This test verifies that:
/// - Dataset has non-nullable fields
/// - Source data has nullable fields BUT no actual null values
/// - merge_insert() succeeds (same behavior as insert)
#[tokio::test]
async fn test_merge_insert_permissive_nullability() {
// Step 1: Create dataset with NON-NULLABLE schema
let non_nullable_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false), // nullable=False
Field::new("value", DataType::Int64, false), // nullable=False
]));

let initial_data = RecordBatch::try_new(
non_nullable_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3])),
Arc::new(Int64Array::from(vec![100, 200, 300])),
],
)
.unwrap();

let test_uri = "memory://test_nullable_issue_4654";
let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(initial_data)], non_nullable_schema.clone()),
test_uri,
None,
)
.await
.unwrap();

// Step 2: Create new data with NULLABLE schema but NO actual null values
let nullable_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, true), // nullable=True
Field::new("value", DataType::Int64, true), // nullable=True
]));

let new_data = RecordBatch::try_new(
nullable_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![2, 4, 5])), // id=2 exists (update), 4,5 new (insert)
Arc::new(Int64Array::from(vec![999, 400, 500])), // No nulls
],
)
.unwrap();

// Step 3: Test merge_insert()
let merge_result = MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
vec![Ok(new_data.clone())],
nullable_schema.clone(),
)))
.await;

assert!(
merge_result.is_ok(),
"merge_insert() should succeed with nullable fields but no actual nulls. \
This is the same behavior as insert/append. Error: {:?}",
merge_result.err()
);

// Step 4: Verify the results
let (merged_dataset, stats) = merge_result.unwrap();

// Should have: 1 updated row (id=2), 2 new rows (id=4,5)
assert_eq!(stats.num_updated_rows, 1, "Should update 1 row (id=2)");
assert_eq!(
stats.num_inserted_rows, 2,
"Should insert 2 new rows (id=4,5)"
);

// Total: 3 original (id=1,2,3) + 2 new (id=4,5) = 5 rows
let count = merged_dataset.count_rows(None).await.unwrap();
assert_eq!(count, 5, "Should have 5 total rows");

// Verify the updated value for id=2
let result = merged_dataset
.scan()
.filter("id = 2")
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();

let batch = concat_batches(&result[0].schema(), &result).unwrap();
assert_eq!(batch.num_rows(), 1);
let value_array = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(
value_array.value(0),
999,
"Value for id=2 should be updated to 999"
);
}

/// Test case for Issue #3634: merge_insert should provide a helpful error
/// message when a subschema with a mismatched type is provided.
#[tokio::test]
async fn test_merge_insert_subschema_invalid_type_error() {
// Step 1: Create a dataset with a multi-column schema.
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Float64, true), // The target type is Float64.
Field::new("extra", DataType::Utf8, true),
]));

let initial_data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();

let test_uri = "memory://test_issue_3634";
let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(initial_data)], schema),
test_uri,
None,
)
.await
.unwrap();

// Step 2: Create source data with a subschema where one field has a wrong type.
let subschema_with_wrong_type = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Int32, true),
]));

let new_data = RecordBatch::try_new(
subschema_with_wrong_type.clone(),
vec![
Arc::new(Int32Array::from(vec![2, 4])),
Arc::new(Int32Array::from(vec![22, 44])),
],
)
.unwrap();

// Step 3: Execute the merge_insert operation, which should fail.
let merge_result = MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
vec![Ok(new_data)],
subschema_with_wrong_type,
)))
.await;

// Step 4: Verify that the operation failed with the correct error type and message.
let err = merge_result.expect_err("Merge insert should have failed but it succeeded.");
assert!(
matches!(err, lance_core::Error::SchemaMismatch { .. }),
"Expected a SchemaMismatch error, but got a different error type: {:?}",
err
);

let error_message = err.to_string();
assert!(
error_message.contains("`value` should have type double but type was int32"),
"Error message should specify the expected (double) and actual (int32) types for 'value', but was: {}",
error_message
);

assert!(
!error_message.contains("missing="),
"Error message should NOT complain about missing fields for a subschema check, but was: {}",
error_message
);
}
}
Loading