diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index d89e08c7d4a66..3ea7321ef3b4b 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -46,6 +46,7 @@ pub mod file_options; pub mod format; pub mod hash_utils; pub mod instant; +pub mod nested_struct; mod null_equality; pub mod parsers; pub mod pruning; diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs new file mode 100644 index 0000000000000..f349b360f2385 --- /dev/null +++ b/datafusion/common/src/nested_struct.rs @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{DataFusionError, Result, _plan_err}; +use arrow::{ + array::{new_null_array, Array, ArrayRef, StructArray}, + compute::cast, + datatypes::{DataType::Struct, Field, FieldRef}, +}; +use std::sync::Arc; + +/// Cast a struct column to match target struct fields, handling nested structs recursively. +/// +/// This function implements struct-to-struct casting with the assumption that **structs should +/// always be allowed to cast to other structs**. However, the source column must already be +/// a struct type - non-struct sources will result in an error. +/// +/// ## Field Matching Strategy +/// - **By Name**: Source struct fields are matched to target fields by name (case-sensitive) +/// - **Type Adaptation**: When a matching field is found, it is recursively cast to the target field's type +/// - **Missing Fields**: Target fields not present in the source are filled with null values +/// - **Extra Fields**: Source fields not present in the target are ignored +/// +/// ## Nested Struct Handling +/// - Nested structs are handled recursively using the same casting rules +/// - Each level of nesting follows the same field matching and null-filling strategy +/// - This allows for complex struct transformations while maintaining data integrity +/// +/// # Arguments +/// * `source_col` - The source array to cast (must be a struct array) +/// * `target_fields` - The target struct field definitions to cast to +/// +/// # Returns +/// A `Result` containing the cast struct array +/// +/// # Errors +/// Returns a `DataFusionError::Plan` if the source column is not a struct type +fn cast_struct_column( + source_col: &ArrayRef, + target_fields: &[Arc], +) -> Result { + if let Some(struct_array) = source_col.as_any().downcast_ref::() { + let mut children: Vec<(Arc, Arc)> = Vec::new(); + let num_rows = source_col.len(); + + for target_child_field in target_fields { + let field_arc = Arc::clone(target_child_field); + match struct_array.column_by_name(target_child_field.name()) { + Some(source_child_col) => { + let adapted_child = + cast_column(source_child_col, target_child_field)?; + children.push((field_arc, adapted_child)); + } + None => { + children.push(( + field_arc, + new_null_array(target_child_field.data_type(), num_rows), + )); + } + } + } + + let struct_array = StructArray::from(children); + Ok(Arc::new(struct_array)) + } else { + // Return error if source is not a struct type + Err(DataFusionError::Plan(format!( + "Cannot cast column of type {:?} to struct type. Source must be a struct to cast to struct.", + source_col.data_type() + ))) + } +} + +/// Cast a column to match the target field type, with special handling for nested structs. +/// +/// This function serves as the main entry point for column casting operations. For struct +/// types, it enforces that **only struct columns can be cast to struct types**. +/// +/// ## Casting Behavior +/// - **Struct Types**: Delegates to `cast_struct_column` for struct-to-struct casting only +/// - **Non-Struct Types**: Uses Arrow's standard `cast` function for primitive type conversions +/// +/// ## Struct Casting Requirements +/// The struct casting logic requires that the source column must already be a struct type. +/// This makes the function useful for: +/// - Schema evolution scenarios where struct layouts change over time +/// - Data migration between different struct schemas +/// - Type-safe data processing pipelines that maintain struct type integrity +/// +/// # Arguments +/// * `source_col` - The source array to cast +/// * `target_field` - The target field definition (including type and metadata) +/// +/// # Returns +/// A `Result` containing the cast array +/// +/// # Errors +/// Returns an error if: +/// - Attempting to cast a non-struct column to a struct type +/// - Arrow's cast function fails for non-struct types +/// - Memory allocation fails during struct construction +/// - Invalid data type combinations are encountered +pub fn cast_column(source_col: &ArrayRef, target_field: &Field) -> Result { + match target_field.data_type() { + Struct(target_fields) => cast_struct_column(source_col, target_fields), + _ => Ok(cast(source_col, target_field.data_type())?), + } +} + +/// Validates compatibility between source and target struct fields for casting operations. +/// +/// This function implements comprehensive struct compatibility checking by examining: +/// - Field name matching between source and target structs +/// - Type castability for each matching field (including recursive struct validation) +/// - Proper handling of missing fields (target fields not in source are allowed - filled with nulls) +/// - Proper handling of extra fields (source fields not in target are allowed - ignored) +/// +/// # Compatibility Rules +/// - **Field Matching**: Fields are matched by name (case-sensitive) +/// - **Missing Target Fields**: Allowed - will be filled with null values during casting +/// - **Extra Source Fields**: Allowed - will be ignored during casting +/// - **Type Compatibility**: Each matching field must be castable using Arrow's type system +/// - **Nested Structs**: Recursively validates nested struct compatibility +/// +/// # Arguments +/// * `source_fields` - Fields from the source struct type +/// * `target_fields` - Fields from the target struct type +/// +/// # Returns +/// * `Ok(true)` if the structs are compatible for casting +/// * `Err(DataFusionError)` with detailed error message if incompatible +/// +/// # Examples +/// ```text +/// // Compatible: source has extra field, target has missing field +/// // Source: {a: i32, b: string, c: f64} +/// // Target: {a: i64, d: bool} +/// // Result: Ok(true) - 'a' can cast i32->i64, 'b','c' ignored, 'd' filled with nulls +/// +/// // Incompatible: matching field has incompatible types +/// // Source: {a: string} +/// // Target: {a: binary} +/// // Result: Err(...) - string cannot cast to binary +/// ``` +pub fn validate_struct_compatibility( + source_fields: &[FieldRef], + target_fields: &[FieldRef], +) -> Result { + // Check compatibility for each target field + for target_field in target_fields { + // Look for matching field in source by name + if let Some(source_field) = source_fields + .iter() + .find(|f| f.name() == target_field.name()) + { + // Check if the matching field types are compatible + match (source_field.data_type(), target_field.data_type()) { + // Recursively validate nested structs + (Struct(source_nested), Struct(target_nested)) => { + validate_struct_compatibility(source_nested, target_nested)?; + } + // For non-struct types, use the existing castability check + _ => { + if !arrow::compute::can_cast_types( + source_field.data_type(), + target_field.data_type(), + ) { + return _plan_err!( + "Cannot cast struct field '{}' from type {:?} to type {:?}", + target_field.name(), + source_field.data_type(), + target_field.data_type() + ); + } + } + } + } + // Missing fields in source are OK - they'll be filled with nulls + } + + // Extra fields in source are OK - they'll be ignored + Ok(true) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{ + array::{Int32Array, Int64Array, StringArray}, + datatypes::{DataType, Field}, + }; + /// Macro to extract and downcast a column from a StructArray + macro_rules! get_column_as { + ($struct_array:expr, $column_name:expr, $array_type:ty) => { + $struct_array + .column_by_name($column_name) + .unwrap() + .as_any() + .downcast_ref::<$array_type>() + .unwrap() + }; + } + + #[test] + fn test_cast_simple_column() { + let source = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let target_field = Field::new("ints", DataType::Int64, true); + let result = cast_column(&source, &target_field).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.len(), 3); + assert_eq!(result.value(0), 1); + assert_eq!(result.value(1), 2); + assert_eq!(result.value(2), 3); + } + + #[test] + fn test_cast_struct_with_missing_field() { + let a_array = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let source_struct = StructArray::from(vec![( + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::clone(&a_array), + )]); + let source_col = Arc::new(source_struct) as ArrayRef; + + let target_field = Field::new( + "s", + Struct( + vec![ + Arc::new(Field::new("a", DataType::Int32, true)), + Arc::new(Field::new("b", DataType::Utf8, true)), + ] + .into(), + ), + true, + ); + + let result = cast_column(&source_col, &target_field).unwrap(); + let struct_array = result.as_any().downcast_ref::().unwrap(); + assert_eq!(struct_array.fields().len(), 2); + let a_result = get_column_as!(&struct_array, "a", Int32Array); + assert_eq!(a_result.value(0), 1); + assert_eq!(a_result.value(1), 2); + + let b_result = get_column_as!(&struct_array, "b", StringArray); + assert_eq!(b_result.len(), 2); + assert!(b_result.is_null(0)); + assert!(b_result.is_null(1)); + } + + #[test] + fn test_cast_struct_source_not_struct() { + let source = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + let target_field = Field::new( + "s", + Struct(vec![Arc::new(Field::new("a", DataType::Int32, true))].into()), + true, + ); + + let result = cast_column(&source, &target_field); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Cannot cast column of type")); + assert!(error_msg.contains("to struct type")); + assert!(error_msg.contains("Source must be a struct")); + } + + #[test] + fn test_validate_struct_compatibility_incompatible_types() { + // Source struct: {field1: Binary, field2: String} + let source_fields = vec![ + Arc::new(Field::new("field1", DataType::Binary, true)), + Arc::new(Field::new("field2", DataType::Utf8, true)), + ]; + + // Target struct: {field1: Int32} + let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))]; + + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Cannot cast struct field 'field1'")); + assert!(error_msg.contains("Binary")); + assert!(error_msg.contains("Int32")); + } + + #[test] + fn test_validate_struct_compatibility_compatible_types() { + // Source struct: {field1: Int32, field2: String} + let source_fields = vec![ + Arc::new(Field::new("field1", DataType::Int32, true)), + Arc::new(Field::new("field2", DataType::Utf8, true)), + ]; + + // Target struct: {field1: Int64} (Int32 can cast to Int64) + let target_fields = vec![Arc::new(Field::new("field1", DataType::Int64, true))]; + + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + #[test] + fn test_validate_struct_compatibility_missing_field_in_source() { + // Source struct: {field2: String} (missing field1) + let source_fields = vec![Arc::new(Field::new("field2", DataType::Utf8, true))]; + + // Target struct: {field1: Int32} + let target_fields = vec![Arc::new(Field::new("field1", DataType::Int32, true))]; + + // Should be OK - missing fields will be filled with nulls + let result = validate_struct_compatibility(&source_fields, &target_fields); + assert!(result.is_ok()); + assert!(result.unwrap()); + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index b3d69064ff15b..94d651ddadd5c 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -51,11 +51,8 @@ pub use datafusion_physical_expr::create_ordering; #[cfg(all(test, feature = "parquet"))] mod tests { - use datafusion_datasource::schema_adapter::{ - DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, - }; - use crate::prelude::SessionContext; + use ::object_store::{path::Path, ObjectMeta}; use arrow::{ array::{Int32Array, StringArray}, datatypes::{DataType, Field, Schema, SchemaRef}, @@ -63,13 +60,17 @@ mod tests { }; use datafusion_common::{record_batch, test_util::batches_to_sort_string}; use datafusion_datasource::{ - file::FileSource, file_scan_config::FileScanConfigBuilder, - source::DataSourceExec, PartitionedFile, + file::FileSource, + file_scan_config::FileScanConfigBuilder, + schema_adapter::{ + DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, + SchemaMapper, + }, + source::DataSourceExec, + PartitionedFile, }; use datafusion_datasource_parquet::source::ParquetSource; - use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::collect; - use object_store::{path::Path, ObjectMeta}; use std::{fs, sync::Arc}; use tempfile::TempDir; @@ -79,6 +80,7 @@ mod tests { // record batches returned from parquet. This can be useful for schema evolution // where older files may not have all columns. + use datafusion_execution::object_store::ObjectStoreUrl; let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); fs::DirBuilder::new().create(table_dir.as_path()).unwrap(); diff --git a/datafusion/datasource/src/schema_adapter.rs b/datafusion/datasource/src/schema_adapter.rs index 519be97a81021..b43041c8d14db 100644 --- a/datafusion/datasource/src/schema_adapter.rs +++ b/datafusion/datasource/src/schema_adapter.rs @@ -20,13 +20,20 @@ //! Adapter provides a method of translating the RecordBatches that come out of the //! physical format into how they should be used by DataFusion. For instance, a schema //! can be stored external to a parquet file that maps parquet logical types to arrow types. - -use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions}; -use arrow::compute::{can_cast_types, cast}; -use arrow::datatypes::{Field, Schema, SchemaRef}; -use datafusion_common::{plan_err, ColumnStatistics}; -use std::fmt::Debug; -use std::sync::Arc; +use arrow::{ + array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions}, + compute::can_cast_types, + datatypes::{DataType, Field, Schema, SchemaRef}, +}; +use datafusion_common::{ + nested_struct::{cast_column, validate_struct_compatibility}, + plan_err, ColumnStatistics, +}; +use std::{fmt::Debug, sync::Arc}; +/// Function used by [`SchemaMapping`] to adapt a column from the file schema to +/// the table schema. +pub type CastColumnFn = + dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result + Send + Sync; /// Factory for creating [`SchemaAdapter`] /// @@ -232,15 +239,22 @@ pub(crate) fn can_cast_field( file_field: &Field, table_field: &Field, ) -> datafusion_common::Result { - if can_cast_types(file_field.data_type(), table_field.data_type()) { - Ok(true) - } else { - plan_err!( - "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", - file_field.name(), - file_field.data_type(), - table_field.data_type() - ) + match (file_field.data_type(), table_field.data_type()) { + (DataType::Struct(source_fields), DataType::Struct(target_fields)) => { + validate_struct_compatibility(source_fields, target_fields) + } + _ => { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + Ok(true) + } else { + plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } } } @@ -277,6 +291,7 @@ impl SchemaAdapter for DefaultSchemaAdapter { Arc::new(SchemaMapping::new( Arc::clone(&self.projected_table_schema), field_mappings, + Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)), )), projection, )) @@ -323,7 +338,6 @@ where /// `projected_table_schema` as it can only operate on the projected fields. /// /// [`map_batch`]: Self::map_batch -#[derive(Debug)] pub struct SchemaMapping { /// The schema of the table. This is the expected schema after conversion /// and it should match the schema of the query result. @@ -334,6 +348,19 @@ pub struct SchemaMapping { /// They are Options instead of just plain `usize`s because the table could /// have fields that don't exist in the file. field_mappings: Vec>, + /// Function used to adapt a column from the file schema to the table schema + /// when it exists in both schemas + cast_column: Arc, +} + +impl Debug for SchemaMapping { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SchemaMapping") + .field("projected_table_schema", &self.projected_table_schema) + .field("field_mappings", &self.field_mappings) + .field("cast_column", &"") + .finish() + } } impl SchemaMapping { @@ -343,10 +370,12 @@ impl SchemaMapping { pub fn new( projected_table_schema: SchemaRef, field_mappings: Vec>, + cast_column: Arc, ) -> Self { Self { projected_table_schema, field_mappings, + cast_column, } } } @@ -373,9 +402,9 @@ impl SchemaMapper for SchemaMapping { // If this field only exists in the table, and not in the file, then we know // that it's null, so just return that. || Ok(new_null_array(field.data_type(), batch_rows)), - // However, if it does exist in both, then try to cast it to the correct output - // type - |batch_idx| cast(&batch_cols[batch_idx], field.data_type()), + // However, if it does exist in both, use the cast_column function + // to perform any necessary conversions + |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field), ) }) .collect::, _>>()?; @@ -421,10 +450,14 @@ impl SchemaMapper for SchemaMapping { #[cfg(test)] mod tests { - use arrow::datatypes::{DataType, Field}; - use datafusion_common::{stats::Precision, Statistics}; - use super::*; + use arrow::{ + array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray}, + compute::cast, + datatypes::{DataType, Field, TimeUnit}, + record_batch::RecordBatch, + }; + use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics}; #[test] fn test_schema_mapping_map_statistics_basic() { @@ -595,8 +628,11 @@ mod tests { let field_mappings = vec![Some(1), Some(0)]; // Create SchemaMapping manually - let mapping = - SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone()); + let mapping = SchemaMapping::new( + Arc::clone(&projected_schema), + field_mappings.clone(), + Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)), + ); // Check that fields were set correctly assert_eq!(*mapping.projected_table_schema, *projected_schema); @@ -700,4 +736,277 @@ mod tests { assert_eq!(c_array.len(), 2); assert_eq!(c_array.null_count(), 2); } + + #[test] + fn test_adapt_struct_with_added_nested_fields() -> Result<()> { + let (file_schema, table_schema) = create_test_schemas_with_nested_fields(); + let batch = create_test_batch_with_struct_data(&file_schema)?; + + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + let mapped_batch = mapper.map_batch(batch)?; + + verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?; + Ok(()) + } + + #[test] + fn test_map_column_statistics_struct() -> Result<()> { + let (file_schema, table_schema) = create_test_schemas_with_nested_fields(); + + let adapter = DefaultSchemaAdapter { + projected_table_schema: Arc::clone(&table_schema), + }; + let (mapper, _) = adapter.map_schema(file_schema.as_ref())?; + + let file_stats = vec![ + create_test_column_statistics( + 0, + 100, + Some(ScalarValue::Int32(Some(1))), + Some(ScalarValue::Int32(Some(100))), + Some(ScalarValue::Int32(Some(5100))), + ), + create_test_column_statistics(10, 50, None, None, None), + ]; + + let table_stats = mapper.map_column_statistics(&file_stats)?; + assert_eq!(table_stats.len(), 1); + verify_column_statistics( + &table_stats[0], + Some(0), + Some(100), + Some(ScalarValue::Int32(Some(1))), + Some(ScalarValue::Int32(Some(100))), + Some(ScalarValue::Int32(Some(5100))), + ); + let missing_stats = mapper.map_column_statistics(&[])?; + assert_eq!(missing_stats.len(), 1); + assert_eq!(missing_stats[0], ColumnStatistics::new_unknown()); + Ok(()) + } + + fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "info", + DataType::Struct( + vec![ + Field::new("location", DataType::Utf8, true), + Field::new( + "timestamp_utc", + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + true, + ), + ] + .into(), + ), + true, + )])); + + let table_schema = Arc::new(Schema::new(vec![Field::new( + "info", + DataType::Struct( + vec![ + Field::new("location", DataType::Utf8, true), + Field::new( + "timestamp_utc", + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + true, + ), + Field::new( + "reason", + DataType::Struct( + vec![ + Field::new("_level", DataType::Float64, true), + Field::new( + "details", + DataType::Struct( + vec![ + Field::new("rurl", DataType::Utf8, true), + Field::new("s", DataType::Float64, true), + Field::new("t", DataType::Utf8, true), + ] + .into(), + ), + true, + ), + ] + .into(), + ), + true, + ), + ] + .into(), + ), + true, + )])); + + (file_schema, table_schema) + } + + fn create_test_batch_with_struct_data( + file_schema: &SchemaRef, + ) -> Result { + let mut location_builder = StringBuilder::new(); + location_builder.append_value("San Francisco"); + location_builder.append_value("New York"); + + let timestamp_array = TimestampMillisecondArray::from(vec![ + Some(1640995200000), + Some(1641081600000), + ]); + + let timestamp_type = + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); + let timestamp_array = cast(×tamp_array, ×tamp_type)?; + + let info_struct = StructArray::from(vec![ + ( + Arc::new(Field::new("location", DataType::Utf8, true)), + Arc::new(location_builder.finish()) as ArrayRef, + ), + ( + Arc::new(Field::new("timestamp_utc", timestamp_type, true)), + timestamp_array, + ), + ]); + + Ok(RecordBatch::try_new( + Arc::clone(file_schema), + vec![Arc::new(info_struct)], + )?) + } + + fn verify_adapted_batch_with_nested_fields( + mapped_batch: &RecordBatch, + table_schema: &SchemaRef, + ) -> Result<()> { + assert_eq!(mapped_batch.schema(), *table_schema); + assert_eq!(mapped_batch.num_rows(), 2); + + let info_col = mapped_batch.column(0); + let info_array = info_col + .as_any() + .downcast_ref::() + .expect("Expected info column to be a StructArray"); + + verify_preserved_fields(info_array)?; + verify_reason_field_structure(info_array)?; + Ok(()) + } + + fn verify_preserved_fields(info_array: &StructArray) -> Result<()> { + let location_col = info_array + .column_by_name("location") + .expect("Expected location field in struct"); + let location_array = location_col + .as_any() + .downcast_ref::() + .expect("Expected location to be a StringArray"); + assert_eq!(location_array.value(0), "San Francisco"); + assert_eq!(location_array.value(1), "New York"); + + let timestamp_col = info_array + .column_by_name("timestamp_utc") + .expect("Expected timestamp_utc field in struct"); + let timestamp_array = timestamp_col + .as_any() + .downcast_ref::() + .expect("Expected timestamp_utc to be a TimestampMillisecondArray"); + assert_eq!(timestamp_array.value(0), 1640995200000); + assert_eq!(timestamp_array.value(1), 1641081600000); + Ok(()) + } + + fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> { + let reason_col = info_array + .column_by_name("reason") + .expect("Expected reason field in struct"); + let reason_array = reason_col + .as_any() + .downcast_ref::() + .expect("Expected reason to be a StructArray"); + assert_eq!(reason_array.fields().len(), 2); + assert!(reason_array.column_by_name("_level").is_some()); + assert!(reason_array.column_by_name("details").is_some()); + + let details_col = reason_array + .column_by_name("details") + .expect("Expected details field in reason struct"); + let details_array = details_col + .as_any() + .downcast_ref::() + .expect("Expected details to be a StructArray"); + assert_eq!(details_array.fields().len(), 3); + assert!(details_array.column_by_name("rurl").is_some()); + assert!(details_array.column_by_name("s").is_some()); + assert!(details_array.column_by_name("t").is_some()); + for i in 0..2 { + assert!(reason_array.is_null(i), "reason field should be null"); + } + Ok(()) + } + + fn verify_column_statistics( + stats: &ColumnStatistics, + expected_null_count: Option, + expected_distinct_count: Option, + expected_min: Option, + expected_max: Option, + expected_sum: Option, + ) { + if let Some(count) = expected_null_count { + assert_eq!( + stats.null_count, + Precision::Exact(count), + "Null count should match expected value" + ); + } + if let Some(count) = expected_distinct_count { + assert_eq!( + stats.distinct_count, + Precision::Exact(count), + "Distinct count should match expected value" + ); + } + if let Some(min) = expected_min { + assert_eq!( + stats.min_value, + Precision::Exact(min), + "Min value should match expected value" + ); + } + if let Some(max) = expected_max { + assert_eq!( + stats.max_value, + Precision::Exact(max), + "Max value should match expected value" + ); + } + if let Some(sum) = expected_sum { + assert_eq!( + stats.sum_value, + Precision::Exact(sum), + "Sum value should match expected value" + ); + } + } + + fn create_test_column_statistics( + null_count: usize, + distinct_count: usize, + min_value: Option, + max_value: Option, + sum_value: Option, + ) -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Exact(null_count), + distinct_count: Precision::Exact(distinct_count), + min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact), + max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact), + sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact), + } + } }