diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 2bd70221f41e1..f7d51c833a65b 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -18,7 +18,7 @@ use arrow::datatypes::SchemaRef; use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; -use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; +use datafusion_common::{config::ConfigOptions, internal_err, Result}; use datafusion_datasource::{ file::FileSource, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, @@ -105,7 +105,6 @@ impl FileOpener for TestOpener { pub struct TestSource { support: bool, predicate: Option>, - statistics: Option, batch_size: Option, batches: Vec, schema: SchemaRef, @@ -125,7 +124,6 @@ impl TestSource { metrics: ExecutionPlanMetricsSet::new(), batches, predicate: None, - statistics: None, batch_size: None, projection: None, schema_adapter_factory: None, @@ -172,25 +170,10 @@ impl FileSource for TestSource { }) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - Arc::new(TestSource { - statistics: Some(statistics), - ..self.clone() - }) - } - fn metrics(&self) -> &ExecutionPlanMetricsSet { &self.metrics } - fn statistics(&self) -> Result { - Ok(self - .statistics - .as_ref() - .expect("statistics not set") - .clone()) - } - fn file_type(&self) -> &str { "test" } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 60fec2243621d..e410c495c8ce8 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -131,10 +131,7 @@ pub(crate) fn parquet_exec_with_stats(file_size: u64) -> Arc { .with_statistics(statistics) .build(); - assert_eq!( - config.file_source.statistics().unwrap().num_rows, - Precision::Inexact(10000) - ); + assert_eq!(config.statistics().num_rows, Precision::Inexact(10000)); DataSourceExec::from_data_source(config) } diff --git a/datafusion/datasource-arrow/src/source.rs b/datafusion/datasource-arrow/src/source.rs index 5e4d7c4606d22..3132d8a10d5ca 100644 --- a/datafusion/datasource-arrow/src/source.rs +++ b/datafusion/datasource-arrow/src/source.rs @@ -40,7 +40,7 @@ use datafusion_datasource::{as_file_source, TableSchema}; use arrow::buffer::Buffer; use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader}; use datafusion_common::error::Result; -use datafusion_common::{exec_datafusion_err, Statistics}; +use datafusion_common::exec_datafusion_err; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; @@ -250,7 +250,6 @@ pub struct ArrowSource { format: ArrowFormat, table_schema: TableSchema, metrics: ExecutionPlanMetricsSet, - projected_statistics: Option, schema_adapter_factory: Option>, } @@ -261,7 +260,6 @@ impl ArrowSource { format: ArrowFormat::File, table_schema: table_schema.into(), metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, } } @@ -272,7 +270,6 @@ impl ArrowSource { format: ArrowFormat::Stream, table_schema: table_schema.into(), metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, } } @@ -305,12 +302,6 @@ impl FileSource for ArrowSource { Arc::new(Self { ..self.clone() }) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut conf = self.clone(); - conf.projected_statistics = Some(statistics); - Arc::new(conf) - } - fn with_projection(&self, _config: &FileScanConfig) -> Arc { Arc::new(Self { ..self.clone() }) } @@ -319,13 +310,6 @@ impl FileSource for ArrowSource { &self.metrics } - fn statistics(&self) -> Result { - let statistics = &self.projected_statistics; - Ok(statistics - .clone() - .expect("projected_statistics must be set")) - } - fn file_type(&self) -> &str { match self.format { ArrowFormat::File => "arrow", diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 600dbed21cc58..e83113f40ea07 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use crate::avro_to_arrow::Reader as AvroReader; use datafusion_common::error::Result; -use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; @@ -41,7 +40,6 @@ pub struct AvroSource { batch_size: Option, projection: Option>, metrics: ExecutionPlanMetricsSet, - projected_statistics: Option, schema_adapter_factory: Option>, } @@ -53,7 +51,6 @@ impl AvroSource { batch_size: None, projection: None, metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, } } @@ -95,12 +92,6 @@ impl FileSource for AvroSource { Arc::new(conf) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut conf = self.clone(); - conf.projected_statistics = Some(statistics); - Arc::new(conf) - } - fn with_projection(&self, config: &FileScanConfig) -> Arc { let mut conf = self.clone(); conf.projection = config.projected_file_column_names(); @@ -111,13 +102,6 @@ impl FileSource for AvroSource { &self.metrics } - fn statistics(&self) -> Result { - let statistics = &self.projected_statistics; - Ok(statistics - .clone() - .expect("projected_statistics must be set")) - } - fn file_type(&self) -> &str { "avro" } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 94c6b3810ae21..b39f5a63a6413 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -34,7 +34,7 @@ use datafusion_datasource::{ use arrow::csv; use datafusion_common::config::CsvOptions; -use datafusion_common::{DataFusionError, Result, Statistics}; +use datafusion_common::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; @@ -90,7 +90,6 @@ pub struct CsvSource { table_schema: TableSchema, file_projection: Option>, metrics: ExecutionPlanMetricsSet, - projected_statistics: Option, schema_adapter_factory: Option>, } @@ -103,7 +102,6 @@ impl CsvSource { batch_size: None, file_projection: None, metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, } } @@ -266,12 +264,6 @@ impl FileSource for CsvSource { Arc::new(conf) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut conf = self.clone(); - conf.projected_statistics = Some(statistics); - Arc::new(conf) - } - fn with_projection(&self, config: &FileScanConfig) -> Arc { let mut conf = self.clone(); conf.file_projection = config.file_column_projection_indices(); @@ -281,12 +273,7 @@ impl FileSource for CsvSource { fn metrics(&self) -> &ExecutionPlanMetricsSet { &self.metrics } - fn statistics(&self) -> Result { - let statistics = &self.projected_statistics; - Ok(statistics - .clone() - .expect("projected_statistics must be set")) - } + fn file_type(&self) -> &str { "csv" } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 44b71ce680fd9..fd382efc75d39 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -37,7 +37,6 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; -use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_execution::TaskContext; @@ -79,7 +78,6 @@ pub struct JsonSource { table_schema: datafusion_datasource::TableSchema, batch_size: Option, metrics: ExecutionPlanMetricsSet, - projected_statistics: Option, schema_adapter_factory: Option>, } @@ -90,7 +88,6 @@ impl JsonSource { table_schema: table_schema.into(), batch_size: None, metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, } } @@ -133,12 +130,6 @@ impl FileSource for JsonSource { Arc::new(conf) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut conf = self.clone(); - conf.projected_statistics = Some(statistics); - Arc::new(conf) - } - fn with_projection(&self, _config: &FileScanConfig) -> Arc { Arc::new(Self { ..self.clone() }) } @@ -147,13 +138,6 @@ impl FileSource for JsonSource { &self.metrics } - fn statistics(&self) -> Result { - let statistics = &self.projected_statistics; - Ok(statistics - .clone() - .expect("projected_statistics must be set to call")) - } - fn file_type(&self) -> &str { "json" } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 7c07b7b68c357..da7bc125d2f6a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -37,7 +37,7 @@ use datafusion_datasource::schema_adapter::{ use arrow::datatypes::TimeUnit; use datafusion_common::config::TableParquetOptions; -use datafusion_common::{DataFusionError, Statistics}; +use datafusion_common::DataFusionError; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::TableSchema; @@ -286,7 +286,6 @@ pub struct ParquetSource { pub(crate) batch_size: Option, /// Optional hint for the size of the parquet metadata pub(crate) metadata_size_hint: Option, - pub(crate) projected_statistics: Option, #[cfg(feature = "parquet_encryption")] pub(crate) encryption_factory: Option>, } @@ -307,7 +306,6 @@ impl ParquetSource { schema_adapter_factory: None, batch_size: None, metadata_size_hint: None, - projected_statistics: None, #[cfg(feature = "parquet_encryption")] encryption_factory: None, } @@ -625,12 +623,6 @@ impl FileSource for ParquetSource { Arc::new(conf) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut conf = self.clone(); - conf.projected_statistics = Some(statistics); - Arc::new(conf) - } - fn with_projection(&self, _config: &FileScanConfig) -> Arc { Arc::new(Self { ..self.clone() }) } @@ -639,23 +631,6 @@ impl FileSource for ParquetSource { &self.metrics } - fn statistics(&self) -> datafusion_common::Result { - let statistics = &self.projected_statistics; - let statistics = statistics - .clone() - .expect("projected_statistics must be set"); - // When filters are pushed down, we have no way of knowing the exact statistics. - // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too). - // Because filter pushdown may happen dynamically as long as there is a predicate - // if we have *any* predicate applied, we can't guarantee the statistics are exact. - if self.filter().is_some() { - Ok(statistics.to_inexact()) - } else { - Ok(statistics) - } - } - fn file_type(&self) -> &str { "parquet" } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 9245f60e2306c..9ec34b5dda0cd 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -27,7 +27,7 @@ use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use crate::schema_adapter::SchemaAdapterFactory; use datafusion_common::config::ConfigOptions; -use datafusion_common::{not_impl_err, Result, Statistics}; +use datafusion_common::{not_impl_err, Result}; use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -68,16 +68,12 @@ pub trait FileSource: Send + Sync { fn with_batch_size(&self, batch_size: usize) -> Arc; /// Initialize new instance with projection information fn with_projection(&self, config: &FileScanConfig) -> Arc; - /// Initialize new instance with projected statistics - fn with_statistics(&self, statistics: Statistics) -> Arc; /// Returns the filter expression that will be applied during the file scan. fn filter(&self) -> Option> { None } /// Return execution plan metrics fn metrics(&self) -> &ExecutionPlanMetricsSet; - /// Return projected statistics - fn statistics(&self) -> Result; /// String representation of file source such as "csv", "json", "parquet" fn file_type(&self) -> &str; /// Format FileType specific information diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 897206c3d56e3..4387996a29814 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -104,7 +104,6 @@ use log::{debug, warn}; /// #[derive(Clone)] /// # struct ParquetSource { /// # table_schema: TableSchema, -/// # projected_statistics: Option, /// # schema_adapter_factory: Option> /// # }; /// # impl FileSource for ParquetSource { @@ -113,15 +112,13 @@ use log::{debug, warn}; /// # fn table_schema(&self) -> &TableSchema { &self.table_schema } /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } -/// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {table_schema: self.table_schema.clone(), projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } -/// # fn statistics(&self) -> Result { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } /// # fn file_type(&self) -> &str { "parquet" } -/// # fn with_schema_adapter_factory(&self, factory: Arc) -> Result> { Ok(Arc::new(Self {table_schema: self.table_schema.clone(), projected_statistics: self.projected_statistics.clone(), schema_adapter_factory: Some(factory)} )) } +/// # fn with_schema_adapter_factory(&self, factory: Arc) -> Result> { Ok(Arc::new(Self {table_schema: self.table_schema.clone(), schema_adapter_factory: Some(factory)} )) } /// # fn schema_adapter_factory(&self) -> Option> { self.schema_adapter_factory.clone() } /// # } /// # impl ParquetSource { -/// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into(), projected_statistics: None, schema_adapter_factory: None} } +/// # fn new(table_schema: impl Into) -> Self { Self {table_schema: table_schema.into(), schema_adapter_factory: None} } /// # } /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); @@ -192,6 +189,13 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, + /// Unprojected statistics for the table (file schema + partition columns). + /// These are projected on-demand via `projected_stats()`. + /// + /// Note that this field is pub(crate) because accessing it directly from outside + /// would be incorrect if there are filters being applied, thus this should be accessed + /// via [`FileScanConfig::statistics`]. + pub(crate) statistics: Statistics, } /// A builder for [`FileScanConfig`]'s. @@ -437,10 +441,9 @@ impl FileScanConfigBuilder { let constraints = constraints.unwrap_or_default(); let statistics = statistics.unwrap_or_else(|| { - Statistics::new_unknown(file_source.table_schema().file_schema()) + Statistics::new_unknown(file_source.table_schema().table_schema()) }); - let file_source = file_source.with_statistics(statistics.clone()); let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); @@ -466,6 +469,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + statistics, } } } @@ -476,7 +480,7 @@ impl From for FileScanConfigBuilder { object_store_url: config.object_store_url, file_source: Arc::::clone(&config.file_source), file_groups: config.file_groups, - statistics: config.file_source.statistics().ok(), + statistics: Some(config.statistics), output_ordering: config.output_ordering, file_compression_type: Some(config.file_compression_type), new_lines_in_values: Some(config.new_lines_in_values), @@ -736,8 +740,21 @@ impl FileScanConfig { } } - pub fn projected_stats(&self) -> Statistics { - let statistics = self.file_source.statistics().unwrap(); + /// Returns the unprojected table statistics, marking them as inexact if filters are present. + /// + /// When filters are pushed down (including pruning predicates and bloom filters), + /// we can't guarantee the statistics are exact because we don't know how many + /// rows will be filtered out. + pub fn statistics(&self) -> Statistics { + if self.file_source.filter().is_some() { + self.statistics.clone().to_inexact() + } else { + self.statistics.clone() + } + } + + fn projected_stats(&self) -> Statistics { + let statistics = self.statistics(); let table_cols_stats = self .projection_indices() @@ -830,7 +847,7 @@ impl FileScanConfig { return ( Arc::clone(self.file_schema()), self.constraints.clone(), - self.file_source.statistics().unwrap().clone(), + self.statistics().clone(), self.output_ordering.clone(), ); } @@ -1064,11 +1081,7 @@ impl Debug for FileScanConfig { write!(f, "FileScanConfig {{")?; write!(f, "object_store_url={:?}, ", self.object_store_url)?; - write!( - f, - "statistics={:?}, ", - self.file_source.statistics().unwrap() - )?; + write!(f, "statistics={:?}, ", self.statistics())?; DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?; write!(f, "}}") @@ -1647,7 +1660,7 @@ mod tests { to_partition_cols(partition_cols.clone()), ); - let source_statistics = conf.file_source.statistics().unwrap(); + let source_statistics = conf.statistics(); let conf_stats = conf.partition_statistics(None).unwrap(); // projection should be reflected in the file source statistics @@ -2347,24 +2360,13 @@ mod tests { assert!(config.constraints.is_empty()); // Verify statistics are set to unknown + assert_eq!(config.statistics().num_rows, Precision::Absent); + assert_eq!(config.statistics().total_byte_size, Precision::Absent); assert_eq!( - config.file_source.statistics().unwrap().num_rows, - Precision::Absent - ); - assert_eq!( - config.file_source.statistics().unwrap().total_byte_size, - Precision::Absent - ); - assert_eq!( - config - .file_source - .statistics() - .unwrap() - .column_statistics - .len(), + config.statistics().column_statistics.len(), file_schema.fields().len() ); - for stat in config.file_source.statistics().unwrap().column_statistics { + for stat in config.statistics().column_statistics { assert_eq!(stat.distinct_count, Precision::Absent); assert_eq!(stat.min_value, Precision::Absent); assert_eq!(stat.max_value, Precision::Absent); diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index 78ba593f22ec8..5d5b277dcf046 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -23,7 +23,7 @@ use crate::{ use std::sync::Arc; use arrow::datatypes::Schema; -use datafusion_common::{Result, Statistics}; +use datafusion_common::Result; use datafusion_physical_expr::{expressions::Column, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; @@ -32,7 +32,6 @@ use object_store::ObjectStore; #[derive(Clone)] pub(crate) struct MockSource { metrics: ExecutionPlanMetricsSet, - projected_statistics: Option, schema_adapter_factory: Option>, filter: Option>, table_schema: crate::table_schema::TableSchema, @@ -42,7 +41,6 @@ impl Default for MockSource { fn default() -> Self { Self { metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, filter: None, table_schema: crate::table_schema::TableSchema::new( @@ -57,7 +55,6 @@ impl MockSource { pub fn new(table_schema: impl Into) -> Self { Self { metrics: ExecutionPlanMetricsSet::new(), - projected_statistics: None, schema_adapter_factory: None, filter: None, table_schema: table_schema.into(), @@ -96,24 +93,10 @@ impl FileSource for MockSource { Arc::new(Self { ..self.clone() }) } - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut source = self.clone(); - source.projected_statistics = Some(statistics); - Arc::new(source) - } - fn metrics(&self) -> &ExecutionPlanMetricsSet { &self.metrics } - fn statistics(&self) -> Result { - Ok(self - .projected_statistics - .as_ref() - .expect("projected_statistics must be set") - .clone()) - } - fn file_type(&self) -> &str { "mock" } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 820628ff70cd6..1ae85618b92ad 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -530,7 +530,7 @@ pub fn serialize_file_scan_config( Ok(protobuf::FileScanExecConf { file_groups, - statistics: Some((&conf.file_source.statistics().unwrap()).into()), + statistics: Some((&conf.statistics()).into()), limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }), projection: conf .projection_exprs diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 73f39eaa7bf95..c50f41625c70d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -53,7 +53,7 @@ use datafusion::datasource::listing::{ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, - FileScanConfigBuilder, FileSinkConfig, FileSource, ParquetSource, + FileScanConfigBuilder, FileSinkConfig, ParquetSource, }; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::DataSourceExec; @@ -1805,8 +1805,7 @@ async fn roundtrip_projection_source() -> Result<()> { let statistics = Statistics::new_unknown(&schema); - let file_source = - ParquetSource::new(Arc::clone(&schema)).with_statistics(statistics.clone()); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&schema))); let scan_config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 8288923008606..0e7edd1887456 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -25,6 +25,87 @@ You can see the current [status of the `52.0.0` release here](https://github.com/apache/datafusion/issues/18566) +### Statistics handling moved from `FileSource` to `FileScanConfig` + +Statistics are now managed directly by `FileScanConfig` instead of being delegated to `FileSource` implementations. This simplifies the `FileSource` trait and provides more consistent statistics handling across all file formats. + +**Who is affected:** + +- Users who have implemented custom `FileSource` implementations + +**Breaking changes:** + +Two methods have been removed from the `FileSource` trait: + +- `with_statistics(&self, statistics: Statistics) -> Arc` +- `statistics(&self) -> Result` + +**Migration guide:** + +If you have a custom `FileSource` implementation, you need to: + +1. Remove the `with_statistics` method implementation +2. Remove the `statistics` method implementation +3. Remove any internal state that was storing statistics + +**Before:** + +```rust,ignore +#[derive(Clone)] +struct MyCustomSource { + table_schema: TableSchema, + projected_statistics: Option, + // other fields... +} + +impl FileSource for MyCustomSource { + fn with_statistics(&self, statistics: Statistics) -> Arc { + Arc::new(Self { + table_schema: self.table_schema.clone(), + projected_statistics: Some(statistics), + // other fields... + }) + } + + fn statistics(&self) -> Result { + Ok(self.projected_statistics.clone().unwrap_or_else(|| + Statistics::new_unknown(self.table_schema.file_schema()) + )) + } + + // other methods... +} +``` + +**After:** + +```rust,ignore +#[derive(Clone)] +struct MyCustomSource { + table_schema: TableSchema, + // projected_statistics field removed + // other fields... +} + +impl FileSource for MyCustomSource { + // with_statistics method removed + // statistics method removed + + // other methods... +} +``` + +**Accessing statistics:** + +Statistics are now accessed through `FileScanConfig` instead of `FileSource`: + +```diff +- let stats = config.file_source.statistics()?; ++ let stats = config.statistics(); +``` + +Note that `FileScanConfig::statistics()` automatically marks statistics as inexact when filters are present, ensuring correctness when filters are pushed down. + ### Planner now requires explicit opt-in for WITHIN GROUP syntax The SQL planner now enforces the aggregate UDF contract more strictly: the