Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,8 @@ impl MergeInsertJob {
compare_metadata: false,
// Allow nullable source fields for non-nullable targets.
compare_nullability: NullabilityComparison::Ignore,
// Allow columns to be in a different order; they will be matched by name.
ignore_field_order: true,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what case would we not want this behavior?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like we assert field order anywhere right now, but it seems like a useful capability. ArrowSchema PartialEq implementation is strict on field order. There are places like in query plans where field order matters.

..Default::default()
},
);
Expand Down Expand Up @@ -5236,4 +5238,80 @@ MergeInsert: on=[id], when_matched=UpdateAll, when_not_matched=InsertAll, when_n
}
}
}

/// Test case for Issue #5323: merge_insert should use the full schema path
/// when columns are provided in a different order than the dataset schema.
#[tokio::test]
async fn test_merge_insert_reordered_columns() {
use arrow_array::record_batch;

let initial_data = record_batch!(
("id", Int32, [1, 2, 3]),
("value", Float64, [1.1, 2.2, 3.3]),
("extra", Utf8, ["a", "b", "c"])
)
.unwrap();

let dataset = Dataset::write(
RecordBatchIterator::new(vec![Ok(initial_data.clone())], initial_data.schema()),
"memory://test_issue_5323",
None,
)
.await
.unwrap();

// Source data with reordered columns: [extra, id, value] instead of [id, value, extra]
let new_data = record_batch!(
("extra", Utf8, ["x", "y"]),
("id", Int32, [2, 4]), // id 2 exists, 4 is new
("value", Float64, [22.2, 44.4])
)
.unwrap();

// Verify reordered columns can use the fast path
let job = MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.unwrap();
assert!(
job.can_use_create_plan(&new_data.schema()).await.unwrap(),
"Reordered schema should be able to use fast path"
);

// Execute and verify data correctness
let (merged_dataset, _) =
MergeInsertBuilder::try_new(Arc::new(dataset), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
vec![Ok(new_data.clone())],
new_data.schema(),
)))
.await
.unwrap();

let result = merged_dataset
.scan()
.order_by(Some(vec![ColumnOrdering::asc_nulls_first(
"id".to_string(),
)]))
.unwrap()
.try_into_batch()
.await
.unwrap();

let expected = record_batch!(
("id", Int32, [1, 2, 3, 4]),
("value", Float64, [1.1, 22.2, 3.3, 44.4]),
("extra", Utf8, ["a", "x", "c", "y"])
)
.unwrap();

assert_eq!(result, expected);
}
}
Loading