diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 250fc5e8d9..aceeae49f7 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -911,6 +911,7 @@ mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 14b5124ee6..d05e028997 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -141,8 +141,8 @@ impl DeleteFilter { return Ok(None); } - // TODO: handle case-insensitive case - let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + let bound_predicate = combined_predicate + .bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; Ok(Some(bound_predicate)) } @@ -211,8 +211,9 @@ pub(crate) mod tests { use super::*; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; + use crate::expr::Reference; use crate::io::FileIO; - use crate::spec::{DataFileFormat, Schema}; + use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; type ArrowSchemaRef = Arc; @@ -344,6 +345,7 @@ pub(crate) mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }, FileScanTask { start: 0, @@ -358,6 +360,7 @@ pub(crate) mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }, ]; @@ -380,4 +383,57 @@ pub(crate) mod tests { ]; Arc::new(arrow_schema::Schema::new(fields)) } + + #[tokio::test] + async fn test_build_equality_delete_predicate_case_sensitive() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(), + ); + + // ---------- fake FileScanTask ---------- + let task = FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_path: "eq-del.parquet".to_string(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: None, + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }; + + let filter = DeleteFilter::default(); + + // ---------- insert equality delete predicate ---------- + let pred = Reference::new("id").equal_to(Datum::long(10)); + + let (tx, rx) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del.parquet", rx); + + tx.send(pred).unwrap(); + + // ---------- should FAIL ---------- + let result = filter.build_equality_delete_predicate(&task).await; + + assert!( + result.is_err(), + "case_sensitive=true should fail when column case mismatches" + ); + } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6209c1e261..f7f90663a5 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2082,6 +2082,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2403,6 +2404,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; // Task 2: read the second and third row groups @@ -2419,6 +2421,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2546,6 +2549,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2717,6 +2721,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2934,6 +2939,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3144,6 +3150,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3247,6 +3254,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3344,6 +3352,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3430,6 +3439,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3530,6 +3540,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3659,6 +3670,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3755,6 +3767,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3864,6 +3877,7 @@ message schema { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -4003,6 +4017,7 @@ message schema { partition: Some(partition_data), partition_spec: Some(partition_spec), name_mapping: None, + case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index f28b6b0901..169d8e6405 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext { snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, delete_file_index: DeleteFileIndex, + case_sensitive: bool, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext { pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, + pub case_sensitive: bool, } impl ManifestFileContext { @@ -89,6 +91,7 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), + case_sensitive: self.case_sensitive, }; sender @@ -135,6 +138,7 @@ impl ManifestEntryContext { partition_spec: None, // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" name_mapping: None, + case_sensitive: self.case_sensitive, }) } } @@ -277,6 +281,7 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + case_sensitive: self.case_sensitive, } } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 1f7fa50df8..c055c12c9a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1885,6 +1885,7 @@ pub mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; test_fn(task); @@ -1902,6 +1903,7 @@ pub mod tests { partition: None, partition_spec: None, name_mapping: None, + case_sensitive: false, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index e1ef241a57..5349a9bdd2 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -104,6 +104,9 @@ pub struct FileScanTask { #[serde(serialize_with = "serialize_not_implemented")] #[serde(deserialize_with = "deserialize_not_implemented")] pub name_mapping: Option>, + + /// Whether this scan task should treat column names as case-sensitive when binding predicates. + pub case_sensitive: bool, } impl FileScanTask {