From 0eb32af3a116ad45fdbe8a509093bfa0915aa391 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 12 Mar 2025 21:50:10 -0400 Subject: [PATCH 1/5] feat: Infer partition values statistics --- .../src/writer/file_writer/parquet_writer.rs | 56 +++++++++++++++++-- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index bed9cc3ddc..49271dc723 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -42,10 +42,11 @@ use crate::arrow::{ }; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ - visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, - NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, StructType, - TableMetadata, Type, + visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, + MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, + Struct, StructType, TableMetadata, Type, }; +use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; @@ -454,7 +455,7 @@ impl ParquetWriter { let mut per_col_size: HashMap = HashMap::new(); let mut per_col_val_num: HashMap = HashMap::new(); let mut per_col_null_val_num: HashMap = HashMap::new(); - let mut min_max_agg = MinMaxColAggregator::new(schema); + let mut min_max_agg = MinMaxColAggregator::new(schema.clone()); for row_group in metadata.row_groups() { for column_chunk_metadata in row_group.columns() { @@ -509,6 +510,53 @@ impl ParquetWriter { Ok(builder) } + + fn partition_value_from_statistics( + table_spec: Arc, + lower_bounds: &HashMap, + upper_bounds: &HashMap, + ) -> Result { + let mut partition_literals: Vec> = Vec::new(); + + for field in table_spec.fields() { + if let (Some(lower), Some(upper)) = ( + lower_bounds.get(&field.field_id), + upper_bounds.get(&field.field_id), + ) { + if !field.transform.preserves_order() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "cannot infer partition value for non linear partition field (needs to preserve order): {} with transform {}", + field.name, field.transform + ), + )); + } + + if lower != upper { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "multiple partition values for field {}: lower: {:?}, upper: {:?}", + field.name, lower, upper + ), + )); + } + + let transform_fn = create_transform_function(&field.transform)?; + let transform_literal = + Literal::from(transform_fn.transform_literal_result(&lower)?); + + partition_literals.push(Some(transform_literal)); + } else { + partition_literals.push(None); + } + } + + let partition_struct = Struct::from_iter(partition_literals); + + Ok(partition_struct) + } } impl FileWriter for ParquetWriter { From 871e1cf6fdbdc7308de78ed3095727e839ccc646 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 12 Mar 2025 21:55:55 -0400 Subject: [PATCH 2/5] clippy --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 49271dc723..30450d7a5d 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -455,7 +455,7 @@ impl ParquetWriter { let mut per_col_size: HashMap = HashMap::new(); let mut per_col_val_num: HashMap = HashMap::new(); let mut per_col_null_val_num: HashMap = HashMap::new(); - let mut min_max_agg = MinMaxColAggregator::new(schema.clone()); + let mut min_max_agg = MinMaxColAggregator::new(schema); for row_group in metadata.row_groups() { for column_chunk_metadata in row_group.columns() { @@ -511,6 +511,7 @@ impl ParquetWriter { Ok(builder) } + #[allow(dead_code)] fn partition_value_from_statistics( table_spec: Arc, lower_bounds: &HashMap, @@ -545,7 +546,7 @@ impl ParquetWriter { let transform_fn = create_transform_function(&field.transform)?; let transform_literal = - Literal::from(transform_fn.transform_literal_result(&lower)?); + Literal::from(transform_fn.transform_literal_result(lower)?); partition_literals.push(Some(transform_literal)); } else { From f6dd3f68839261420999c8a147b37b1ab6225a76 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 2 Apr 2025 17:45:31 -0400 Subject: [PATCH 3/5] change function name --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 39da1c0ac0..c926b6fcf3 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -461,7 +461,7 @@ impl ParquetWriter { } #[allow(dead_code)] - fn partition_value_from_statistics( + fn partition_value_from_bounds ( table_spec: Arc, lower_bounds: &HashMap, upper_bounds: &HashMap, From a060a1f4f3ac6aa6718e60dcd6016a7590b6ce22 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Wed, 2 Apr 2025 17:55:22 -0400 Subject: [PATCH 4/5] fmt --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index c926b6fcf3..8089a57919 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -461,7 +461,7 @@ impl ParquetWriter { } #[allow(dead_code)] - fn partition_value_from_bounds ( + fn partition_value_from_bounds( table_spec: Arc, lower_bounds: &HashMap, upper_bounds: &HashMap, From 7c59336e50afe35b99cbcc086e9fd9bdb2b83c2c Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 7 Apr 2025 19:44:26 -0400 Subject: [PATCH 5/5] fix --- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 8089a57919..32ee2bdfff 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -470,8 +470,8 @@ impl ParquetWriter { for field in table_spec.fields() { if let (Some(lower), Some(upper)) = ( - lower_bounds.get(&field.field_id), - upper_bounds.get(&field.field_id), + lower_bounds.get(&field.source_id), + upper_bounds.get(&field.source_id), ) { if !field.transform.preserves_order() { return Err(Error::new(