diff --git a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs index a2cd187fee06..df1dacf23a51 100644 --- a/datafusion-examples/examples/custom_data_source/adapter_serialization.rs +++ b/datafusion-examples/examples/custom_data_source/adapter_serialization.rs @@ -317,7 +317,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec { extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { // Check if this is a DataSourceExec with adapter - if let Some(exec) = plan.as_any().downcast_ref::() + if let Some(exec) = plan.downcast_ref::() && let Some(config) = exec.data_source().as_any().downcast_ref::() && let Some(adapter_factory) = &config.expr_adapter_factory @@ -481,7 +481,7 @@ fn inject_adapter_into_plan( plan: Arc, adapter_factory: Arc, ) -> Result> { - if let Some(exec) = plan.as_any().downcast_ref::() + if let Some(exec) = plan.downcast_ref::() && let Some(config) = exec.data_source().as_any().downcast_ref::() { let new_config = FileScanConfigBuilder::from(config.clone()) @@ -497,7 +497,7 @@ fn inject_adapter_into_plan( fn verify_adapter_in_plan(plan: &Arc, label: &str) -> bool { // Walk the plan tree to find DataSourceExec with adapter fn check_plan(plan: &dyn ExecutionPlan) -> bool { - if let Some(exec) = plan.as_any().downcast_ref::() + if let Some(exec) = plan.downcast_ref::() && let Some(config) = exec.data_source().as_any().downcast_ref::() && config.expr_adapter_factory.is_some() diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 71e589dcf6e8..0b441b9d7c8d 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -235,10 +235,6 @@ impl ExecutionPlan for CustomExec { "CustomExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion-examples/examples/data_io/parquet_exec_visitor.rs b/datafusion-examples/examples/data_io/parquet_exec_visitor.rs index 47caf9480df9..d1951b2d9904 100644 --- a/datafusion-examples/examples/data_io/parquet_exec_visitor.rs +++ b/datafusion-examples/examples/data_io/parquet_exec_visitor.rs @@ -104,7 +104,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor { /// or `post_visit` (visit each node after its children/inputs) fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { // If needed match on a specific `ExecutionPlan` node type - if let Some(data_source_exec) = plan.as_any().downcast_ref::() + if let Some(data_source_exec) = plan.downcast_ref::() && let Some((file_config, _)) = data_source_exec.downcast_to_file_source::() { diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index 1440347d4413..dc374c7e02fe 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -43,7 +43,6 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::*; use futures::stream::{StreamExt, TryStreamExt}; -use std::any::Any; use std::fmt; use std::sync::Arc; @@ -226,10 +225,6 @@ impl ExecutionPlan for BufferingExecutionPlan { "BufferingExecutionPlan" } - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index df3d58b7bfb8..ae9503dd87b1 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -32,7 +32,6 @@ //! DeltaScan //! ``` -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -103,10 +102,6 @@ impl ExecutionPlan for ParentExec { "ParentExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { unreachable!() } @@ -161,7 +156,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec { } fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { - if node.as_any().downcast_ref::().is_some() { + if node.is::() { buf.extend_from_slice("ParentExec".as_bytes()); Ok(()) } else { @@ -188,10 +183,6 @@ impl ExecutionPlan for ChildExec { "ChildExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { unreachable!() } @@ -244,7 +235,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec { } fn try_encode(&self, node: Arc, buf: &mut Vec) -> Result<()> { - if node.as_any().downcast_ref::().is_some() { + if node.is::() { buf.extend_from_slice("ChildExec".as_bytes()); Ok(()) } else { diff --git a/datafusion-examples/examples/proto/expression_deduplication.rs b/datafusion-examples/examples/proto/expression_deduplication.rs index 0dec807f8043..9f3940c82d00 100644 --- a/datafusion-examples/examples/proto/expression_deduplication.rs +++ b/datafusion-examples/examples/proto/expression_deduplication.rs @@ -124,8 +124,7 @@ pub async fn expression_deduplication() -> Result<()> { // Step 5: check that we deduplicated expressions println!("Step 5: Checking for deduplicated expressions..."); - let Some(filter_exec) = deserialized_plan.as_any().downcast_ref::() - else { + let Some(filter_exec) = deserialized_plan.downcast_ref::() else { panic!("Deserialized plan is not a FilterExec"); }; let predicate = Arc::clone(filter_exec.predicate()); diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 04e5efd9706a..42342e5f1a64 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -80,7 +80,6 @@ //! ``` use std::{ - any::Any, fmt::{self, Debug, Formatter}, hash::{Hash, Hasher}, pin::Pin, @@ -682,10 +681,6 @@ impl ExecutionPlan for SampleExec { "SampleExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 9b91062657a0..a0a34a8f6d6a 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -594,10 +594,6 @@ impl ExecutionPlan for DmlResultExec { "DmlResultExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5dd11739c1f5..d14ec1f56dce 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -107,6 +107,7 @@ impl ListingTableConfigExt for ListingTableConfig { #[cfg(test)] mod tests { + #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::listing::table::ListingTableConfigExt; @@ -404,7 +405,7 @@ mod tests { .await .expect("Empty execution plan"); - assert!(scan.as_any().is::()); + assert!(scan.is::()); assert_eq!( columns(&scan.schema()), vec!["a".to_owned(), "p1".to_owned()] diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 683a16af6cb4..82f3c4d80c9e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3632,7 +3632,6 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let final_hash_agg = execution_plan - .as_any() .downcast_ref::() .expect("hash aggregate"); assert_eq!( @@ -3660,7 +3659,6 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let final_hash_agg = execution_plan - .as_any() .downcast_ref::() .expect("hash aggregate"); assert_eq!( @@ -3795,7 +3793,7 @@ mod tests { .unwrap(); let plan = plan(&logical_plan).await.unwrap(); - if let Some(plan) = plan.as_any().downcast_ref::() { + if let Some(plan) = plan.downcast_ref::() { let stringified_plans = plan.stringified_plans(); assert!(stringified_plans.len() >= 4); assert!( @@ -3863,7 +3861,7 @@ mod tests { .handle_explain(&explain, &ctx.state()) .await .unwrap(); - if let Some(plan) = plan.as_any().downcast_ref::() { + if let Some(plan) = plan.downcast_ref::() { let stringified_plans = plan.stringified_plans(); assert_eq!(stringified_plans.len(), 1); assert_eq!(stringified_plans[0].plan.as_str(), "Test Err"); @@ -4003,10 +4001,6 @@ mod tests { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -4169,9 +4163,6 @@ digraph { fn schema(&self) -> SchemaRef { Arc::new(Schema::empty()) } - fn as_any(&self) -> &dyn Any { - unimplemented!() - } fn children(&self) -> Vec<&Arc> { self.0.iter().collect::>() } @@ -4224,9 +4215,6 @@ digraph { ) -> Result> { unimplemented!() } - fn as_any(&self) -> &dyn Any { - unimplemented!() - } fn children(&self) -> Vec<&Arc> { unimplemented!() } @@ -4351,9 +4339,6 @@ digraph { ) -> Result> { unimplemented!() } - fn as_any(&self) -> &dyn Any { - unimplemented!() - } fn children(&self) -> Vec<&Arc> { vec![] } @@ -4765,6 +4750,6 @@ digraph { .unwrap(); assert_eq!(plan.schema(), schema); - assert!(plan.as_any().is::()); + assert!(plan.is::()); } } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 53084fd9a0df..c53495421307 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -196,7 +196,7 @@ impl TestParquetFile { /// Recursively searches for DataSourceExec and returns the metrics /// on the first one it finds pub fn parquet_metrics(plan: &Arc) -> Option { - if let Some(data_source_exec) = plan.as_any().downcast_ref::() + if let Some(data_source_exec) = plan.downcast_ref::() && data_source_exec .downcast_to_file_source::() .is_some() diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 6919d9794b29..c01e65beddd3 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -157,10 +157,6 @@ impl ExecutionPlan for CustomExecutionPlan { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -335,7 +331,7 @@ async fn optimizers_catch_all_statistics() { #[expect(clippy::needless_pass_by_value)] fn contains_place_holder_exec(plan: Arc) -> bool { - if plan.as_any().is::() { + if plan.is::() { true } else if plan.children().len() != 1 { false diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 8078b0a7ec15..82774c8b44dd 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -109,10 +109,6 @@ impl ExecutionPlan for CustomPlan { Self::static_name() } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 561c6b3b246f..63b5398b7072 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -155,10 +155,6 @@ impl ExecutionPlan for StatisticsValidation { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index d64223abdb76..4726e7c4aca5 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -547,7 +547,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) { type Node = Arc; fn f_down(&mut self, node: &'n Self::Node) -> Result { - if let Some(exec) = node.as_any().downcast_ref::() { + if let Some(exec) = node.downcast_ref::() { if self.expected_sort { assert!(matches!( exec.input_order_mode(), diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index eed172f09f99..403e377a690e 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -24,7 +24,6 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; -use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::{Arc, Mutex}; @@ -80,10 +79,6 @@ impl ExecutionPlan for OnceExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs index b21bffebaf95..27bcd33926e9 100644 --- a/datafusion/core/tests/memory_limit/repartition_mem_limit.rs +++ b/datafusion/core/tests/memory_limit/repartition_mem_limit.rs @@ -74,7 +74,7 @@ async fn test_repartition_memory_limit() { let mut metrics = None; Arc::clone(&plan) .transform_down(|node| { - if node.as_any().is::() { + if node.is::() { metrics = node.metrics(); } Ok(Transformed::no(node)) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index fdefdafa00aa..c96df5c50998 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -88,7 +88,7 @@ async fn check_stats_precision_with_filter_pushdown() { .unwrap(); assert!( - optimized_exec.as_any().is::(), + optimized_exec.is::(), "Sanity check that the pushdown did what we expected" ); // Scan with filter pushdown, stats are inexact @@ -196,7 +196,7 @@ async fn list_files_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_list_file_cache_size(&state1), 0); let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); - let data_source_exec = exec1.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec1.downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet1 = data_source .as_any() @@ -212,7 +212,7 @@ async fn list_files_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state2), 0); let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); - let data_source_exec = exec2.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec2.downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet2 = data_source .as_any() @@ -228,7 +228,7 @@ async fn list_files_with_session_level_cache() { //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state1), 1); let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); - let data_source_exec = exec3.as_any().downcast_ref::().unwrap(); + let data_source_exec = exec3.downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); let parquet3 = data_source .as_any() diff --git a/datafusion/core/tests/parquet/utils.rs b/datafusion/core/tests/parquet/utils.rs index e5e0026ec1f1..77bc808f1ea0 100644 --- a/datafusion/core/tests/parquet/utils.rs +++ b/datafusion/core/tests/parquet/utils.rs @@ -47,7 +47,7 @@ impl MetricsFinder { impl ExecutionPlanVisitor for MetricsFinder { type Error = std::convert::Infallible; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - if let Some(data_source_exec) = plan.as_any().downcast_ref::() + if let Some(data_source_exec) = plan.downcast_ref::() && data_source_exec .downcast_to_file_source::() .is_some() diff --git a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs index 850f9d187780..808e163b0836 100644 --- a/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs @@ -83,7 +83,7 @@ async fn assert_count_optim_success( let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?; // A ProjectionExec is a sign that the count optimization was applied - assert!(optimized.as_any().is::()); + assert!(optimized.is::()); // run both the optimized and nonoptimized plan let optimized_result = @@ -280,7 +280,7 @@ async fn test_count_inexact_stat() -> Result<()> { let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); + assert!(optimized.is::()); Ok(()) } @@ -324,7 +324,7 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> { let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; // check that the original ExecutionPlan was not replaced - assert!(optimized.as_any().is::()); + assert!(optimized.is::()); Ok(()) } @@ -526,7 +526,7 @@ async fn test_count_distinct_optimization() -> Result<()> { if case.expect_optimized { assert!( - optimized.as_any().is::(), + optimized.is::(), "'{}': expected ProjectionExec", case.name ); @@ -544,7 +544,7 @@ async fn test_count_distinct_optimization() -> Result<()> { } } else { assert!( - optimized.as_any().is::(), + optimized.is::(), "'{}': expected AggregateExec (not optimized)", case.name ); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 3a6106c45356..4aea2a052aeb 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -174,10 +174,6 @@ impl ExecutionPlan for SortRequiredExec { "SortRequiredExec" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -275,10 +271,6 @@ impl ExecutionPlan for SinglePartitionMaintainsOrderExec { "SinglePartitionMaintainsOrderExec" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -3901,7 +3893,6 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { // Verify the plan was transformed to CoalescePartitionsExec result .plan - .as_any() .downcast_ref::() .expect("Expected CoalescePartitionsExec"); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index 6fe77c5e89e6..a255c0754582 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -4666,7 +4666,6 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() { // Get the HashJoinExec to check the dynamic filter let hash_join = plan - .as_any() .downcast_ref::() .expect("Plan should be HashJoinExec"); diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 1c94a7bd1e91..ee22e7b61643 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -18,7 +18,6 @@ use insta::assert_snapshot; use std::sync::Arc; use std::{ - any::Any, pin::Pin, task::{Context, Poll}, }; @@ -233,7 +232,6 @@ async fn test_join_with_swap() { .unwrap(); let swapping_projection = optimized_join - .as_any() .downcast_ref::() .expect("A proj is required to swap columns back to their original order"); @@ -247,7 +245,6 @@ async fn test_join_with_swap() { let swapped_join = swapping_projection .input() - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -296,7 +293,6 @@ async fn test_left_join_no_swap() { .unwrap(); let swapped_join = optimized_join - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -346,12 +342,9 @@ async fn test_join_with_swap_semi() { .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect( - "A proj is not required to swap columns back to their original order", - ); + let swapped_join = optimized_join.downcast_ref::().expect( + "A proj is not required to swap columns back to their original order", + ); assert_eq!(swapped_join.schema().fields().len(), 1); assert_eq!( @@ -402,12 +395,9 @@ async fn test_join_with_swap_mark() { .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); - let swapped_join = optimized_join - .as_any() - .downcast_ref::() - .expect( - "A proj is not required to swap columns back to their original order", - ); + let swapped_join = optimized_join.downcast_ref::().expect( + "A proj is not required to swap columns back to their original order", + ); assert_eq!(swapped_join.schema().fields().len(), 2); assert_eq!( @@ -535,7 +525,6 @@ async fn test_join_no_swap() { .unwrap(); let swapped_join = optimized_join - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -584,7 +573,6 @@ async fn test_nl_join_with_swap(join_type: JoinType) { .unwrap(); let swapping_projection = optimized_join - .as_any() .downcast_ref::() .expect("A proj is required to swap columns back to their original order"); @@ -598,7 +586,6 @@ async fn test_nl_join_with_swap(join_type: JoinType) { let swapped_join = swapping_projection .input() - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -665,7 +652,6 @@ async fn test_nl_join_with_swap_no_proj(join_type: JoinType) { .unwrap(); let swapped_join = optimized_join - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -759,7 +745,9 @@ async fn test_hash_join_swap_on_joins_with_projections( let swapped = join .swap_inputs(PartitionMode::Partitioned) .expect("swap_hash_join must support joins with projections"); - let swapped_join = swapped.as_any().downcast_ref::().expect( + let swapped_join = swapped + .downcast_ref::() + .expect( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); @@ -926,18 +914,15 @@ fn check_join_partition_mode( if !is_swapped { let swapped_join = optimized_join - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); assert_eq!(*swapped_join.partition_mode(), expected_mode); } else { let swapping_projection = optimized_join - .as_any() .downcast_ref::() .expect("A proj is required to swap columns back to their original order"); let swapped_join = swapping_projection .input() - .as_any() .downcast_ref::() .expect("The type of the plan should not be changed"); @@ -1049,10 +1034,6 @@ impl ExecutionPlan for UnboundedExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1164,10 +1145,6 @@ impl ExecutionPlan for StatisticsExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1595,10 +1572,9 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { JoinSelection::new().optimize(Arc::clone(&join), &ConfigOptions::new())?; // If swap did happen - let projection_added = optimized_join_plan.as_any().is::(); + let projection_added = optimized_join_plan.is::(); let plan = if projection_added { let proj = optimized_join_plan - .as_any() .downcast_ref::() .expect("A proj is required to swap columns back to their original order"); Arc::::clone(proj.input()) @@ -1612,7 +1588,7 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { join_type, mode, .. - }) = plan.as_any().downcast_ref::() + }) = plan.downcast_ref::() { let left_changed = Arc::ptr_eq(left, &right_exec); let right_changed = Arc::ptr_eq(right, &left_exec); diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index d3448062d106..a85ce8d080d1 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -515,7 +515,6 @@ fn test_memory_after_projection() -> Result<()> { assert_eq!( after_optimize .clone() - .as_any() .downcast_ref::() .unwrap() .data_source() @@ -598,10 +597,7 @@ fn test_streaming_table_after_projection() -> Result<()> { let after_optimize = ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?; - let result = after_optimize - .as_any() - .downcast_ref::() - .unwrap(); + let result = after_optimize.downcast_ref::().unwrap(); assert_eq!( result.partition_schema(), &Arc::new(Schema::new(vec![ @@ -792,7 +788,6 @@ fn test_output_req_after_projection() -> Result<()> { ); assert_eq!( after_optimize - .as_any() .downcast_ref::() .unwrap() .required_input_ordering()[0] @@ -805,7 +800,6 @@ fn test_output_req_after_projection() -> Result<()> { Arc::new(Column::new("b", 2)), ]; if let Distribution::HashPartitioned(vec) = after_optimize - .as_any() .downcast_ref::() .unwrap() .required_input_distribution()[0] @@ -1036,7 +1030,6 @@ fn test_join_after_projection() -> Result<()> { assert_eq!( expected_filter_col_ind, after_optimize - .as_any() .downcast_ref::() .unwrap() .filter() @@ -1400,7 +1393,6 @@ fn test_repartition_after_projection() -> Result<()> { assert_eq!( after_optimize - .as_any() .downcast_ref::() .unwrap() .partitioning() diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index ce2cb04b64a5..e8c02d4efcfd 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -490,10 +490,6 @@ impl ExecutionPlan for TestNode { "TestInsertExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { self.input.properties() } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 8d9e7b68b8c9..4b6db1abc45d 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -17,7 +17,6 @@ //! Test utilities for physical optimizer tests -use std::any::Any; use std::fmt::{Display, Formatter}; use std::sync::{Arc, LazyLock}; @@ -452,10 +451,6 @@ impl ExecutionPlan for RequirementsTestExec { "RequiredInputOrderingExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { self.input.properties() } @@ -927,10 +922,6 @@ impl ExecutionPlan for TestScan { "TestScan" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index b8dc6ab59cee..8ab0d150a727 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -139,14 +139,14 @@ async fn explain_analyze_baseline_metrics() { use datafusion::physical_plan; use datafusion::physical_plan::sorts; - plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() + plan.is::() + || plan.is::() + || plan.is::() + || plan.is::() + || plan.is::() + || plan.is::() + || plan.is::() + || plan.is::() } // Validate that the recorded elapsed compute time was more than diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 2a2aed82f0af..2bab79df424b 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -58,7 +58,7 @@ async fn insert_operation_is_passed_correctly_to_table_provider() { async fn assert_insert_op(ctx: &SessionContext, sql: &str, insert_op: InsertOp) { let df = ctx.sql(sql).await.unwrap(); let plan = df.create_physical_plan().await.unwrap(); - let exec = plan.as_any().downcast_ref::().unwrap(); + let exec = plan.downcast_ref::().unwrap(); assert_eq!(exec.op, insert_op); } @@ -158,10 +158,6 @@ impl ExecutionPlan for TestInsertExec { "TestInsertExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 6e4ed69e508d..505468a19cd3 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -60,7 +60,7 @@ use std::fmt::Debug; use std::hash::Hash; use std::task::{Context, Poll}; -use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; +use std::{collections::BTreeMap, fmt, sync::Arc}; use arrow::array::{Array, ArrayRef, StringViewArray}; use arrow::{ @@ -706,10 +706,6 @@ impl ExecutionPlan for TopKExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 155c951fe575..d438f2de92a0 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -171,10 +171,6 @@ impl ExecutionPlan for DataSinkExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 73fc46f634ac..81e15d0a2a09 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -285,10 +285,6 @@ impl ExecutionPlan for DataSourceExec { "DataSourceExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 465947065687..54957c273abc 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -907,6 +907,25 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any { } } +impl dyn AggregateUDFImpl { + /// Returns `true` if the implementation is of type `T`. + /// + /// Works correctly when called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast to a concrete type `T`, returning `None` if the + /// implementation is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + impl PartialEq for dyn AggregateUDFImpl { fn eq(&self, other: &Self) -> bool { self.dyn_eq(other) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 418dc9660614..ddbfb65df053 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -360,7 +360,7 @@ impl ScalarUDF { /// Return true if this function is an async function pub fn as_async(&self) -> Option<&AsyncScalarUDF> { - (self.inner().as_ref() as &dyn Any).downcast_ref::() + self.inner().downcast_ref::() } /// Returns placement information for this function. @@ -984,6 +984,25 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any { } } +impl dyn ScalarUDFImpl { + /// Returns `true` if the implementation is of type `T`. + /// + /// Works correctly when called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast to a concrete type `T`, returning `None` if the + /// implementation is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// ScalarUDF that adds an alias to the underlying function. It is better to /// implement [`ScalarUDFImpl`], which supports aliases, directly if possible. #[derive(Debug, PartialEq, Eq, Hash)] diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index d2f7f1334804..5a5daca28a91 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -431,6 +431,25 @@ pub trait WindowUDFImpl: Debug + DynEq + DynHash + Send + Sync + Any { } } +impl dyn WindowUDFImpl { + /// Returns `true` if the implementation is of type `T`. + /// + /// Works correctly when called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast to a concrete type `T`, returning `None` if the + /// implementation is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// the effect this function will have on the limit pushdown pub enum LimitEffect { /// Does not affect the limit (i.e. this is causal) diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index eba16d939078..4733a1e9b9c2 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -214,7 +214,7 @@ fn pass_runtime_to_children( runtime: &Handle, ) -> Result>> { let mut updated_children = false; - let plan_is_foreign = plan.as_any().is::(); + let plan_is_foreign = plan.is::(); let children = plan .children() @@ -233,7 +233,7 @@ fn pass_runtime_to_children( // `ForeignExecutionPlan`. In this case wrap the plan in a `ForeignExecutionPlan` // because when we call `with_new_children` below it will extract the // FFI plan that does contain the runtime. - if plan_is_foreign && !child.as_any().is::() { + if plan_is_foreign && !child.is::() { updated_children = true; let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone())); let foreign_child = ForeignExecutionPlan::try_from(ffi_child); @@ -255,7 +255,7 @@ impl FFI_ExecutionPlan { pub fn new(mut plan: Arc, runtime: Option) -> Self { // Note to developers: `pass_runtime_to_children` relies on the logic here to // get the underlying FFI plan during calls to `new_with_children`. - if let Some(plan) = plan.as_any().downcast_ref::() { + if let Some(plan) = plan.downcast_ref::() { return plan.plan.clone(); } @@ -369,10 +369,6 @@ impl ExecutionPlan for ForeignExecutionPlan { &self.name } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.properties } @@ -482,10 +478,6 @@ pub mod tests { "empty-exec" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.props } @@ -610,16 +602,11 @@ pub mod tests { // Verify local libraries can be downcast to their original let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); - assert!(foreign_plan.as_any().downcast_ref::().is_some()); + assert!(foreign_plan.is::()); // Verify different library markers generate foreign providers ffi_plan.library_marker_id = crate::mock_foreign_marker_id; let foreign_plan: Arc = (&ffi_plan).try_into().unwrap(); - assert!( - foreign_plan - .as_any() - .downcast_ref::() - .is_some() - ); + assert!(foreign_plan.is::()); } } diff --git a/datafusion/ffi/src/proto/logical_extension_codec.rs b/datafusion/ffi/src/proto/logical_extension_codec.rs index 19fc73b368cd..a5be8588d23e 100644 --- a/datafusion/ffi/src/proto/logical_extension_codec.rs +++ b/datafusion/ffi/src/proto/logical_extension_codec.rs @@ -489,7 +489,6 @@ impl LogicalExtensionCodec for ForeignLogicalExtensionCodec { #[cfg(test)] mod tests { - use std::any::Any; use std::sync::Arc; use arrow::array::record_batch; @@ -659,7 +658,7 @@ mod tests { let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } @@ -680,7 +679,7 @@ mod tests { let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } @@ -704,7 +703,7 @@ mod tests { let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } diff --git a/datafusion/ffi/src/proto/physical_extension_codec.rs b/datafusion/ffi/src/proto/physical_extension_codec.rs index ad4a9a0ae75b..882da7c08474 100644 --- a/datafusion/ffi/src/proto/physical_extension_codec.rs +++ b/datafusion/ffi/src/proto/physical_extension_codec.rs @@ -410,7 +410,6 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec { #[cfg(test)] pub(crate) mod tests { - use std::any::Any; use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; @@ -466,7 +465,7 @@ pub(crate) mod tests { ) -> Result<()> { buf.push(Self::MAGIC_NUMBER); - let Some(_) = node.as_any().downcast_ref::() else { + let Some(_) = node.downcast_ref::() else { return exec_err!("TestExtensionCodec only expects EmptyExec"); }; @@ -493,7 +492,7 @@ pub(crate) mod tests { buf.push(Self::MAGIC_NUMBER); let udf = node.inner(); - if !(udf.as_ref() as &dyn Any).is::() { + if !udf.is::() { return exec_err!("TestExtensionCodec only expects Abs UDF"); }; @@ -520,7 +519,7 @@ pub(crate) mod tests { buf.push(Self::MAGIC_NUMBER); let udf = node.inner(); - let Some(_udf) = (udf.as_ref() as &dyn Any).downcast_ref::() else { + let Some(_udf) = udf.downcast_ref::() else { return exec_err!("TestExtensionCodec only expects Sum UDAF"); }; @@ -550,7 +549,7 @@ pub(crate) mod tests { buf.push(Self::MAGIC_NUMBER); let udf = node.inner(); - let Some(udf) = (udf.as_ref() as &dyn Any).downcast_ref::() else { + let Some(udf) = udf.downcast_ref::() else { return exec_err!("TestExtensionCodec only expects Rank UDWF"); }; @@ -588,7 +587,7 @@ pub(crate) mod tests { let returned_exec = foreign_codec.try_decode(&bytes, &input_execs, ctx.task_ctx().as_ref())?; - assert!(returned_exec.as_any().is::()); + assert!(returned_exec.is::()); Ok(()) } @@ -609,7 +608,7 @@ pub(crate) mod tests { let returned_udf = foreign_codec.try_decode_udf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } @@ -630,7 +629,7 @@ pub(crate) mod tests { let returned_udf = foreign_codec.try_decode_udaf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } @@ -654,7 +653,7 @@ pub(crate) mod tests { let returned_udf = foreign_codec.try_decode_udwf(udf.name(), &bytes)?; - assert!((returned_udf.inner().as_ref() as &dyn Any).is::()); + assert!(returned_udf.inner().is::()); Ok(()) } diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index e9fa31a7fc6e..0097872d4970 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -191,10 +191,6 @@ impl ExecutionPlan for AsyncTestExecutionPlan { "async test execution plan" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index d3cb3ca7bb70..57f828260586 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -372,9 +371,7 @@ impl Clone for FFI_AggregateUDF { impl From> for FFI_AggregateUDF { fn from(udaf: Arc) -> Self { - if let Some(udaf) = - (udaf.inner().as_ref() as &dyn Any).downcast_ref::() - { + if let Some(udaf) = udaf.inner().downcast_ref::() { return udaf.udaf.clone(); } @@ -644,7 +641,6 @@ impl From for FFI_AggregateOrderSensitivity { #[cfg(test)] mod tests { - use std::any::Any; use std::collections::HashMap; use arrow::datatypes::Schema; @@ -855,20 +851,12 @@ mod tests { // Verify local libraries can be downcast to their original let foreign_udaf: Arc = (&ffi_udaf).into(); - assert!( - (foreign_udaf.as_ref() as &dyn Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udaf.is::()); // Verify different library markers generate foreign providers ffi_udaf.library_marker_id = crate::mock_foreign_marker_id; let foreign_udaf: Arc = (&ffi_udaf).into(); - assert!( - (foreign_udaf.as_ref() as &dyn Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udaf.is::()); Ok(()) } diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 766c7dece3d8..fc26b2b098b0 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -231,9 +230,7 @@ impl Clone for FFI_ScalarUDF { impl From> for FFI_ScalarUDF { fn from(udf: Arc) -> Self { - if let Some(udf) = - (udf.inner().as_ref() as &dyn Any).downcast_ref::() - { + if let Some(udf) = udf.inner().downcast_ref::() { return udf.udf.clone(); } @@ -433,7 +430,6 @@ impl ScalarUDFImpl for ForeignScalarUDF { #[cfg(test)] mod tests { use super::*; - use std::any::Any; #[test] fn test_round_trip_scalar_udf() -> Result<()> { @@ -460,20 +456,12 @@ mod tests { // Verify local libraries can be downcast to their original let foreign_udf: Arc = (&ffi_udf).into(); - assert!( - (foreign_udf.as_ref() as &dyn Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udf.is::()); // Verify different library markers generate foreign providers ffi_udf.library_marker_id = crate::mock_foreign_marker_id; let foreign_udf: Arc = (&ffi_udf).into(); - assert!( - (foreign_udf.as_ref() as &dyn Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udf.is::()); Ok(()) } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 7b5183f7f6ea..b7f6baac4c37 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -222,9 +222,7 @@ impl Clone for FFI_WindowUDF { impl From> for FFI_WindowUDF { fn from(udf: Arc) -> Self { - if let Some(udwf) = (udf.inner().as_ref() as &dyn std::any::Any) - .downcast_ref::() - { + if let Some(udwf) = udf.inner().downcast_ref::() { return udwf.udf.clone(); } @@ -474,20 +472,12 @@ mod tests { // Verify local libraries can be downcast to their original let foreign_udwf: Arc = (&ffi_udwf).into(); - assert!( - (foreign_udwf.as_ref() as &dyn std::any::Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udwf.is::()); // Verify different library markers generate foreign providers ffi_udwf.library_marker_id = crate::mock_foreign_marker_id; let foreign_udwf: Arc = (&ffi_udwf).into(); - assert!( - (foreign_udwf.as_ref() as &dyn std::any::Any) - .downcast_ref::() - .is_some() - ); + assert!(foreign_udwf.is::()); Ok(()) } diff --git a/datafusion/ffi/tests/ffi_execution_plan.rs b/datafusion/ffi/tests/ffi_execution_plan.rs index d81f947dc80e..4911566f4f90 100644 --- a/datafusion/ffi/tests/ffi_execution_plan.rs +++ b/datafusion/ffi/tests/ffi_execution_plan.rs @@ -58,7 +58,7 @@ mod tests { let child_plan: Arc = (&child_plan) .try_into() .expect("should be able create plan"); - assert!(child_plan.as_any().is::()); + assert!(child_plan.is::()); let grandchild_plan = generate_local_plan(); diff --git a/datafusion/functions/src/core/getfield.rs b/datafusion/functions/src/core/getfield.rs index 247c1493b134..e1fce4ee6c83 100644 --- a/datafusion/functions/src/core/getfield.rs +++ b/datafusion/functions/src/core/getfield.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::sync::Arc; use arrow::array::{ @@ -440,9 +439,7 @@ impl ScalarUDFImpl for GetFieldFunc { func, args: inner_args, }) = current_expr - && (func.inner().as_ref() as &dyn Any) - .downcast_ref::() - .is_some() + && func.inner().is::() { // Store this level's path arguments (all except the first, which is base/nested call) path_args_stack.push(&inner_args[1..]); diff --git a/datafusion/functions/src/math/log.rs b/datafusion/functions/src/math/log.rs index ae336786a5d8..ac94f78e0c72 100644 --- a/datafusion/functions/src/math/log.rs +++ b/datafusion/functions/src/math/log.rs @@ -17,8 +17,6 @@ //! Math function: `log()`. -use std::any::Any; - use super::power::PowerFunc; use crate::utils::calculate_binary_math; @@ -399,9 +397,7 @@ impl ScalarUDFImpl for LogFunc { /// Returns true if the function is `PowerFunc` fn is_pow(func: &ScalarUDF) -> bool { - (func.inner().as_ref() as &dyn Any) - .downcast_ref::() - .is_some() + func.inner().is::() } #[cfg(test)] diff --git a/datafusion/functions/src/math/power.rs b/datafusion/functions/src/math/power.rs index bac970c95d08..ea7c85a33782 100644 --- a/datafusion/functions/src/math/power.rs +++ b/datafusion/functions/src/math/power.rs @@ -16,8 +16,6 @@ // under the License. //! Math function: `power()`. -use std::any::Any; - use super::log::LogFunc; use crate::utils::{calculate_binary_decimal_math, calculate_binary_math}; @@ -601,9 +599,7 @@ impl ScalarUDFImpl for PowerFunc { /// Return true if this function call is a call to `Log` fn is_log(func: &ScalarUDF) -> bool { - (func.inner().as_ref() as &dyn Any) - .downcast_ref::() - .is_some() + func.inner().is::() } #[cfg(test)] diff --git a/datafusion/physical-expr/src/async_scalar_function.rs b/datafusion/physical-expr/src/async_scalar_function.rs index 4fc79822b596..9f58d7c77a83 100644 --- a/datafusion/physical-expr/src/async_scalar_function.rs +++ b/datafusion/physical-expr/src/async_scalar_function.rs @@ -100,8 +100,7 @@ impl AsyncFuncExpr { /// Return the ideal batch size for this function pub fn ideal_batch_size(&self) -> Result> { if let Some(expr) = self.func.as_any().downcast_ref::() - && let Some(udf) = - (expr.fun().inner().as_ref() as &dyn Any).downcast_ref::() + && let Some(udf) = expr.fun().inner().downcast_ref::() { return Ok(udf.ideal_batch_size()); } @@ -125,7 +124,9 @@ impl AsyncFuncExpr { ); }; - let Some(async_udf) = (scalar_function_expr.fun().inner().as_ref() as &dyn Any) + let Some(async_udf) = scalar_function_expr + .fun() + .inner() .downcast_ref::() else { return not_impl_err!( diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index ef800b1439c3..5e6e0aa97f00 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -46,7 +46,7 @@ use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::fields_with_udf; use datafusion_expr::{ ColumnarValue, ExpressionPlacement, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, - Volatility, expr_vec_fmt, + ScalarUDFImpl, Volatility, expr_vec_fmt, }; /// Physical expression of a scalar function @@ -169,16 +169,10 @@ impl ScalarFunctionExpr { /// Otherwise returns `Some(ScalarFunctionExpr)`. pub fn try_downcast_func(expr: &dyn PhysicalExpr) -> Option<&ScalarFunctionExpr> where - T: 'static, + T: ScalarUDFImpl, { match expr.as_any().downcast_ref::() { - Some(scalar_expr) - if (scalar_expr.fun().inner().as_ref() as &dyn Any) - .downcast_ref::() - .is_some() => - { - Some(scalar_expr) - } + Some(scalar_expr) if scalar_expr.fun().inner().is::() => Some(scalar_expr), _ => None, } } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 5caee8b047d8..75da1873263d 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -51,7 +51,6 @@ impl PhysicalOptimizerRule for AggregateStatistics { ) -> Result> { if let Some(partial_agg_exec) = take_optimizable(&*plan) { let partial_agg_exec = partial_agg_exec - .as_any() .downcast_ref::() .expect("take_optimizable() ensures that this is a AggregateExec"); let stats = partial_agg_exec.input().partition_statistics(None)?; @@ -115,13 +114,13 @@ impl PhysicalOptimizerRule for AggregateStatistics { /// We would have preferred to return a casted ref to AggregateExec but the recursion requires /// the `ExecutionPlan.children()` method that returns an owned reference. fn take_optimizable(node: &dyn ExecutionPlan) -> Option> { - if let Some(final_agg_exec) = node.as_any().downcast_ref::() + if let Some(final_agg_exec) = node.downcast_ref::() && final_agg_exec.mode().input_mode() == AggregateInputMode::Partial && final_agg_exec.group_expr().is_empty() { let mut child = Arc::clone(final_agg_exec.input()); loop { - if let Some(partial_agg_exec) = child.as_any().downcast_ref::() + if let Some(partial_agg_exec) = child.downcast_ref::() && partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw && partial_agg_exec.group_expr().is_empty() && partial_agg_exec.filter_expr().iter().all(|e| e.is_none()) diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 860406118c1b..74e938e75ed6 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -54,7 +54,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { ) -> Result> { plan.transform_down(|plan| { // Check if the plan is AggregateExec - let Some(agg_exec) = plan.as_any().downcast_ref::() else { + let Some(agg_exec) = plan.downcast_ref::() else { return Ok(Transformed::no(plan)); }; @@ -66,8 +66,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { } // Check if the input is AggregateExec - let Some(input_agg_exec) = - agg_exec.input().as_any().downcast_ref::() + let Some(input_agg_exec) = agg_exec.input().downcast_ref::() else { return Ok(Transformed::no(plan)); }; diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 504197a2ded5..ea47c1203039 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -21,6 +21,7 @@ //! according to the configuration), this rule increases partition counts in //! the physical plan. +use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -294,7 +295,7 @@ pub fn adjust_input_keys_ordering( mode, .. }, - ) = plan.as_any().downcast_ref::() + ) = plan.downcast_ref::() { match mode { PartitionMode::Partitioned => { @@ -338,8 +339,7 @@ pub fn adjust_input_keys_ordering( requirements.data.clear(); } } - } else if let Some(CrossJoinExec { left, .. }) = - plan.as_any().downcast_ref::() + } else if let Some(CrossJoinExec { left, .. }) = plan.downcast_ref::() { let left_columns_len = left.schema().fields().len(); // Push down requirements to the right side @@ -355,7 +355,7 @@ pub fn adjust_input_keys_ordering( sort_options, null_equality, .. - }) = plan.as_any().downcast_ref::() + }) = plan.downcast_ref::() { let join_constructor = |new_conditions: ( Vec<(PhysicalExprRef, PhysicalExprRef)>, @@ -379,7 +379,7 @@ pub fn adjust_input_keys_ordering( &join_constructor, ) .map(Transformed::yes); - } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { + } else if let Some(aggregate_exec) = plan.downcast_ref::() { if !requirements.data.is_empty() { if aggregate_exec.mode() == &AggregateMode::FinalPartitioned { return reorder_aggregate_keys(requirements, aggregate_exec) @@ -391,7 +391,7 @@ pub fn adjust_input_keys_ordering( // Keep everything unchanged return Ok(Transformed::no(requirements)); } - } else if let Some(proj) = plan.as_any().downcast_ref::() { + } else if let Some(proj) = plan.downcast_ref::() { let expr = proj.expr(); // For Projection, we need to transform the requirements to the columns before the Projection // And then to push down the requirements @@ -407,12 +407,9 @@ pub fn adjust_input_keys_ordering( // Can not satisfy, clear the current requirements and generate new empty requirements requirements.data.clear(); } - } else if plan.as_any().downcast_ref::().is_some() - || plan - .as_any() - .downcast_ref::() - .is_some() - || plan.as_any().downcast_ref::().is_some() + } else if plan.is::() + || plan.is::() + || plan.is::() { requirements.data.clear(); } else { @@ -484,7 +481,7 @@ pub fn reorder_aggregate_keys( && agg_exec.group_expr().null_expr().is_empty() && !physical_exprs_equal(&output_exprs, parent_required) && let Some(positions) = expected_expr_positions(&output_exprs, parent_required) - && let Some(agg_exec) = agg_exec.input().as_any().downcast_ref::() + && let Some(agg_exec) = agg_exec.input().downcast_ref::() && *agg_exec.mode() == AggregateMode::Partial { let group_exprs = agg_exec.group_expr().expr(); @@ -563,11 +560,13 @@ fn shift_right_required( let new_right_required = parent_required .iter() .filter_map(|r| { - r.as_any().downcast_ref::().and_then(|col| { - col.index() - .checked_sub(left_columns_len) - .map(|index| Arc::new(Column::new(col.name(), index)) as _) - }) + (r.as_ref() as &dyn Any) + .downcast_ref::() + .and_then(|col| { + col.index() + .checked_sub(left_columns_len) + .map(|index| Arc::new(Column::new(col.name(), index)) as _) + }) }) .collect::>(); @@ -598,7 +597,6 @@ fn shift_right_required( pub fn reorder_join_keys_to_inputs( plan: Arc, ) -> Result> { - let plan_any = plan.as_any(); if let Some( exec @ HashJoinExec { left, @@ -607,7 +605,7 @@ pub fn reorder_join_keys_to_inputs( mode, .. }, - ) = plan_any.downcast_ref::() + ) = plan.downcast_ref::() { if *mode == PartitionMode::Partitioned { let (join_keys, positions) = reorder_current_join_keys( @@ -639,7 +637,7 @@ pub fn reorder_join_keys_to_inputs( sort_options, null_equality, .. - }) = plan_any.downcast_ref::() + }) = plan.downcast_ref::() { let (join_keys, positions) = reorder_current_join_keys( extract_join_keys(on), @@ -1066,8 +1064,7 @@ pub fn replace_order_preserving_variants( CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()), ); return Ok(context); - } else if let Some(repartition) = - context.plan.as_any().downcast_ref::() + } else if let Some(repartition) = context.plan.downcast_ref::() && repartition.preserve_order() { context.plan = Arc::new(RepartitionExec::try_new( @@ -1226,7 +1223,7 @@ pub fn ensure_distribution( children, } = remove_dist_changing_operators(dist_context)?; - if let Some(exec) = plan.as_any().downcast_ref::() { + if let Some(exec) = plan.downcast_ref::() { if let Some(updated_window) = get_best_fitting_window( exec.window_expr(), exec.input(), @@ -1234,7 +1231,7 @@ pub fn ensure_distribution( )? { plan = updated_window; } - } else if let Some(exec) = plan.as_any().downcast_ref::() + } else if let Some(exec) = plan.downcast_ref::() && let Some(updated_window) = get_best_fitting_window( exec.window_expr(), exec.input(), @@ -1274,10 +1271,9 @@ pub fn ensure_distribution( // CollectLeft/CollectRight modes are safe because one side is collected // to a single partition which eliminates partition-to-partition mapping. let is_partitioned_join = plan - .as_any() .downcast_ref::() .is_some_and(|join| join.mode == PartitionMode::Partitioned) - || plan.as_any().is::(); + || plan.is::(); let repartition_status_flags = get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?; @@ -1326,7 +1322,7 @@ pub fn ensure_distribution( // aggregates from different partitions are correctly combined. let requires_grouping_id = matches!(&requirement, Distribution::HashPartitioned(exprs) if exprs.iter().any(|expr| { - expr.as_any() + (expr.as_ref() as &dyn Any) .downcast_ref::() .is_some_and(|col| col.name() == Aggregate::INTERNAL_GROUPING_ID) }) @@ -1406,8 +1402,7 @@ pub fn ensure_distribution( child = add_sort_above_with_check( child, sort_req, - plan.as_any() - .downcast_ref::() + plan.downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), )?; @@ -1435,7 +1430,7 @@ pub fn ensure_distribution( } Distribution::UnspecifiedDistribution => { // Since ordering is lost, trying to preserve ordering is pointless - if !maintains || plan.as_any().is::() { + if !maintains || plan.is::() { child = replace_order_preserving_variants(child)?; } } @@ -1451,7 +1446,7 @@ pub fn ensure_distribution( .map(|c| Arc::clone(&c.plan)) .collect::>(); - plan = if plan.as_any().is::() + plan = if plan.is::() && !config.optimizer.prefer_existing_union && can_interleave(children_plans.iter()) { @@ -1496,31 +1491,31 @@ pub type DistributionContext = PlanContext; fn update_children(mut dist_context: DistributionContext) -> Result { for child_context in dist_context.children.iter_mut() { - let child_plan_any = child_context.plan.as_any(); - child_context.data = - if let Some(repartition) = child_plan_any.downcast_ref::() { - !matches!( - repartition.partitioning(), - Partitioning::UnknownPartitioning(_) - ) - } else { - child_plan_any.is::() - || child_plan_any.is::() - || child_context.plan.children().is_empty() - || child_context.children[0].data - || child_context - .plan - .required_input_distribution() - .iter() - .zip(child_context.children.iter()) - .any(|(required_dist, child_context)| { - child_context.data - && matches!( - required_dist, - Distribution::UnspecifiedDistribution - ) - }) - } + child_context.data = if let Some(repartition) = + child_context.plan.downcast_ref::() + { + !matches!( + repartition.partitioning(), + Partitioning::UnknownPartitioning(_) + ) + } else { + child_context.plan.is::() + || child_context.plan.is::() + || child_context.plan.children().is_empty() + || child_context.children[0].data + || child_context + .plan + .required_input_distribution() + .iter() + .zip(child_context.children.iter()) + .any(|(required_dist, child_context)| { + child_context.data + && matches!( + required_dist, + Distribution::UnspecifiedDistribution + ) + }) + } } dist_context.data = false; diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 5f1b3613143e..729a6b3121a8 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -265,8 +265,7 @@ impl PhysicalOptimizerRule for EnforceSorting { fn replace_with_partial_sort( plan: Arc, ) -> Result> { - let plan_any = plan.as_any(); - let Some(sort_plan) = plan_any.downcast_ref::() else { + let Some(sort_plan) = plan.downcast_ref::() else { return Ok(plan); }; @@ -509,8 +508,7 @@ pub fn ensure_sorting( child = add_sort_above( child, req, - plan.as_any() - .downcast_ref::() + plan.downcast_ref::() .map(|output| output.fetch()) .unwrap_or(None), ); @@ -555,7 +553,7 @@ pub fn ensure_sorting( fn analyze_immediate_sort_removal( mut node: PlanWithCorrespondingSort, ) -> Result> { - let Some(sort_exec) = node.plan.as_any().downcast_ref::() else { + let Some(sort_exec) = node.plan.downcast_ref::() else { return Ok(Transformed::no(node)); }; let sort_input = sort_exec.input(); @@ -620,22 +618,22 @@ fn adjust_window_sort_removal( )?; window_tree.children.push(child_node); - let plan = window_tree.plan.as_any(); let child_plan = &window_tree.children[0].plan; - let (window_expr, new_window) = - if let Some(exec) = plan.downcast_ref::() { - let window_expr = exec.window_expr(); - let new_window = - get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; - (window_expr, new_window) - } else if let Some(exec) = plan.downcast_ref::() { - let window_expr = exec.window_expr(); - let new_window = - get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; - (window_expr, new_window) - } else { - return plan_err!("Expected WindowAggExec or BoundedWindowAggExec"); - }; + let (window_expr, new_window) = if let Some(exec) = + window_tree.plan.downcast_ref::() + { + let window_expr = exec.window_expr(); + let new_window = + get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; + (window_expr, new_window) + } else if let Some(exec) = window_tree.plan.downcast_ref::() { + let window_expr = exec.window_expr(); + let new_window = + get_best_fitting_window(window_expr, child_plan, &exec.partition_keys())?; + (window_expr, new_window) + } else { + return plan_err!("Expected WindowAggExec or BoundedWindowAggExec"); + }; window_tree.plan = if let Some(new_window) = new_window { // We were able to change the window to accommodate the input, use it: @@ -706,7 +704,7 @@ fn remove_bottleneck_in_subplan( .collect::>()?; } let mut new_reqs = requirements.update_plan_from_children()?; - if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::() { + if let Some(repartition) = new_reqs.plan.downcast_ref::() { let input_partitioning = repartition.input().output_partitioning(); // We can remove this repartitioning operator if it is now a no-op: let mut can_remove = input_partitioning.eq(repartition.partitioning()); @@ -744,7 +742,7 @@ fn remove_corresponding_sort_from_sub_plan( requires_single_partition: bool, ) -> Result { // A `SortExec` is always at the bottom of the tree. - if let Some(sort_exec) = node.plan.as_any().downcast_ref::() { + if let Some(sort_exec) = node.plan.downcast_ref::() { // Do not remove sorts with fetch: if sort_exec.fetch().is_none() { node = node.children.swap_remove(0); @@ -777,9 +775,7 @@ fn remove_corresponding_sort_from_sub_plan( if is_sort_preserving_merge(&node.plan) { node.children = node.children.swap_remove(0).children; node.plan = Arc::clone(node.plan.children().swap_remove(0)); - } else if let Some(repartition) = - node.plan.as_any().downcast_ref::() - { + } else if let Some(repartition) = node.plan.downcast_ref::() { node.plan = Arc::new(RepartitionExec::try_new( Arc::clone(&node.children[0].plan), repartition.properties().output_partitioning().clone(), @@ -811,10 +807,9 @@ fn remove_corresponding_sort_from_sub_plan( fn get_sort_exprs( sort_any: &Arc, ) -> Result<(&LexOrdering, Option)> { - if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { + if let Some(sort_exec) = sort_any.downcast_ref::() { Ok((sort_exec.expr(), sort_exec.fetch())) - } else if let Some(spm) = sort_any.as_any().downcast_ref::() - { + } else if let Some(spm) = sort_any.downcast_ref::() { Ok((spm.expr(), spm.fetch())) } else { plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec") diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 23af981b3531..3677f3a45b59 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -280,7 +280,7 @@ fn pushdown_requirement_to_children( } RequirementsCompatibility::NonCompatible => Ok(None), } - } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + } else if let Some(sort_exec) = plan.downcast_ref::() { let Some(sort_ordering) = sort_exec.properties().output_ordering().cloned() else { return internal_err!("SortExec should have output ordering"); @@ -319,7 +319,7 @@ fn pushdown_requirement_to_children( // `UnionExec` does not have real sort requirements for its input, we // just propagate the sort requirements down: Ok(Some(vec![Some(parent_required); plan.children().len()])) - } else if let Some(smj) = plan.as_any().downcast_ref::() { + } else if let Some(smj) = plan.downcast_ref::() { let left_columns_len = smj.left().schema().fields().len(); let parent_ordering: Vec = parent_required .first() @@ -354,14 +354,14 @@ fn pushdown_requirement_to_children( Ok(None) } } - } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { + } else if let Some(aggregate_exec) = plan.downcast_ref::() { handle_aggregate_pushdown(aggregate_exec, parent_required) } else if maintains_input_order.is_empty() || !maintains_input_order.iter().any(|o| *o) - || plan.as_any().is::() - || plan.as_any().is::() + || plan.is::() + || plan.is::() // TODO: Add support for Projection push down - || plan.as_any().is::() + || plan.is::() || pushdown_would_violate_requirements(&parent_required, plan.as_ref()) { // If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements. @@ -383,7 +383,7 @@ fn pushdown_requirement_to_children( // ordering requirement invalidates requirement of sort preserving merge exec. Ok(None) } - } else if let Some(hash_join) = plan.as_any().downcast_ref::() { + } else if let Some(hash_join) = plan.downcast_ref::() { handle_hash_join(hash_join, parent_required) } else { handle_custom_pushdown(plan, parent_required, &maintains_input_order) diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs index e783c0a9e776..102e21a4853a 100644 --- a/datafusion/physical-optimizer/src/ensure_coop.rs +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -272,7 +272,6 @@ mod tests { SendableRecordBatchStream, execution_plan::{Boundedness, EmissionType}, }; - use std::any::Any; #[derive(Debug)] struct DummyExec { @@ -323,9 +322,6 @@ mod tests { fn name(&self) -> &str { &self.name } - fn as_any(&self) -> &dyn Any { - self - } fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/physical-optimizer/src/hash_join_buffering.rs b/datafusion/physical-optimizer/src/hash_join_buffering.rs index 3c29b46c0fa6..7a198cac13fc 100644 --- a/datafusion/physical-optimizer/src/hash_join_buffering.rs +++ b/datafusion/physical-optimizer/src/hash_join_buffering.rs @@ -64,14 +64,14 @@ impl PhysicalOptimizerRule for HashJoinBuffering { } plan.transform_down(|plan| { - let Some(node) = plan.as_any().downcast_ref::() else { + let Some(node) = plan.downcast_ref::() else { return Ok(Transformed::no(plan)); }; let plan = Arc::clone(&plan); Ok(Transformed::yes( if HashJoinExec::probe_side() == JoinSide::Left { // Do not stack BufferExec nodes together. - if node.left.as_any().downcast_ref::().is_some() { + if node.left.is::() { return Ok(Transformed::no(plan)); } plan.with_new_children(vec![ @@ -80,7 +80,7 @@ impl PhysicalOptimizerRule for HashJoinBuffering { ])? } else { // Do not stack BufferExec nodes together. - if node.right.as_any().downcast_ref::().is_some() { + if node.right.is::() { return Ok(Transformed::no(plan)); } plan.with_new_children(vec![ diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index c796638dd123..0b8c070c3759 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -286,56 +286,55 @@ fn statistical_join_selection_subrule( plan: Arc, config: &ConfigOptions, ) -> Result>> { - let transformed = - if let Some(hash_join) = plan.as_any().downcast_ref::() { - match hash_join.partition_mode() { - PartitionMode::Auto => try_collect_left(hash_join, false, config)? - .map_or_else( - || partitioned_hash_join(hash_join, config).map(Some), - |v| Ok(Some(v)), - )?, - PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)? - .map_or_else( - || partitioned_hash_join(hash_join, config).map(Some), - |v| Ok(Some(v)), - )?, - PartitionMode::Partitioned => { - let left = hash_join.left(); - let right = hash_join.right(); - // Don't swap null-aware anti joins as they have specific side requirements - if hash_join.join_type().supports_swap() - && !hash_join.null_aware - && should_swap_join_order(&**left, &**right, config)? - { - hash_join - .swap_inputs(PartitionMode::Partitioned) - .map(Some)? - } else { - None - } + let transformed = if let Some(hash_join) = plan.downcast_ref::() { + match hash_join.partition_mode() { + PartitionMode::Auto => try_collect_left(hash_join, false, config)? + .map_or_else( + || partitioned_hash_join(hash_join, config).map(Some), + |v| Ok(Some(v)), + )?, + PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)? + .map_or_else( + || partitioned_hash_join(hash_join, config).map(Some), + |v| Ok(Some(v)), + )?, + PartitionMode::Partitioned => { + let left = hash_join.left(); + let right = hash_join.right(); + // Don't swap null-aware anti joins as they have specific side requirements + if hash_join.join_type().supports_swap() + && !hash_join.null_aware + && should_swap_join_order(&**left, &**right, config)? + { + hash_join + .swap_inputs(PartitionMode::Partitioned) + .map(Some)? + } else { + None } } - } else if let Some(cross_join) = plan.as_any().downcast_ref::() { - let left = cross_join.left(); - let right = cross_join.right(); - if should_swap_join_order(&**left, &**right, config)? { - cross_join.swap_inputs().map(Some)? - } else { - None - } - } else if let Some(nl_join) = plan.as_any().downcast_ref::() { - let left = nl_join.left(); - let right = nl_join.right(); - if nl_join.join_type().supports_swap() - && should_swap_join_order(&**left, &**right, config)? - { - nl_join.swap_inputs().map(Some)? - } else { - None - } + } + } else if let Some(cross_join) = plan.downcast_ref::() { + let left = cross_join.left(); + let right = cross_join.right(); + if should_swap_join_order(&**left, &**right, config)? { + cross_join.swap_inputs().map(Some)? } else { None - }; + } + } else if let Some(nl_join) = plan.downcast_ref::() { + let left = nl_join.left(); + let right = nl_join.right(); + if nl_join.join_type().supports_swap() + && should_swap_join_order(&**left, &**right, config)? + { + nl_join.swap_inputs().map(Some)? + } else { + None + } + } else { + None + }; Ok(if let Some(transformed) = transformed { Transformed::yes(transformed) @@ -369,7 +368,7 @@ fn hash_join_convert_symmetric_subrule( config_options: &ConfigOptions, ) -> Result> { // Check if the current plan node is a HashJoinExec. - if let Some(hash_join) = input.as_any().downcast_ref::() { + if let Some(hash_join) = input.downcast_ref::() { let left_unbounded = hash_join.left.boundedness().is_unbounded(); let left_incremental = matches!( hash_join.left.pipeline_behavior(), @@ -508,7 +507,7 @@ pub fn hash_join_swap_subrule( mut input: Arc, _config_options: &ConfigOptions, ) -> Result> { - if let Some(hash_join) = input.as_any().downcast_ref::() + if let Some(hash_join) = input.downcast_ref::() && hash_join.left.boundedness().is_unbounded() && !hash_join.right.boundedness().is_unbounded() && !hash_join.null_aware // Don't swap null-aware anti joins diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 0b85eb805bba..c5fa0cc3ee78 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -325,7 +325,7 @@ pub(crate) fn pushdown_limits( /// Extracts limit information from the [`ExecutionPlan`] if it is a /// [`GlobalLimitExec`] or a [`LocalLimitExec`]. fn extract_limit(plan: &Arc) -> Option { - if let Some(global_limit) = plan.as_any().downcast_ref::() { + if let Some(global_limit) = plan.downcast_ref::() { Some(LimitInfo { input: Arc::clone(global_limit.input()), fetch: global_limit.fetch(), @@ -333,8 +333,7 @@ fn extract_limit(plan: &Arc) -> Option { preserve_order: global_limit.required_ordering().is_some(), }) } else { - plan.as_any() - .downcast_ref::() + plan.downcast_ref::() .map(|local_limit| LimitInfo { input: Arc::clone(local_limit.input()), fetch: Some(local_limit.fetch()), @@ -346,7 +345,6 @@ fn extract_limit(plan: &Arc) -> Option { /// Checks if the given plan combines input partitions. fn combines_input_partitions(plan: &Arc) -> bool { - let plan = plan.as_any(); plan.is::() || plan.is::() } diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs index fdb5cfa6003b..092570b05197 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs @@ -104,7 +104,7 @@ impl PhysicalOptimizerRule for LimitPushPastWindows { } // grow the limit if we hit a window function - if let Some(window) = node.as_any().downcast_ref::() { + if let Some(window) = node.downcast_ref::() { phase = Phase::Apply; if !grow_limit(window, &mut ctx) { return reset(node, &mut ctx); @@ -123,7 +123,7 @@ impl PhysicalOptimizerRule for LimitPushPastWindows { if !node.supports_limit_pushdown() { return reset(node, &mut ctx); } - if let Some(part) = node.as_any().downcast_ref::() { + if let Some(part) = node.downcast_ref::() { let output = part.partitioning().partition_count(); let input = part.input().output_partitioning().partition_count(); if output < input { @@ -185,7 +185,7 @@ fn apply_limit( node: &Arc, ctx: &mut TraverseState, ) -> Option>> { - if !node.as_any().is::() && !node.as_any().is::() { + if !node.is::() && !node.is::() { return None; } let latest = ctx.limit.take(); @@ -202,17 +202,17 @@ fn apply_limit( } fn get_limit(node: &Arc, ctx: &mut TraverseState) -> bool { - if let Some(limit) = node.as_any().downcast_ref::() { + if let Some(limit) = node.downcast_ref::() { ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip())); return true; } // In distributed execution, GlobalLimitExec becomes LocalLimitExec // per partition. Handle it the same way (LocalLimitExec has no skip). - if let Some(limit) = node.as_any().downcast_ref::() { + if let Some(limit) = node.downcast_ref::() { ctx.reset_limit(Some(limit.fetch())); return true; } - if let Some(limit) = node.as_any().downcast_ref::() { + if let Some(limit) = node.downcast_ref::() { ctx.reset_limit(limit.fetch()); return true; } diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index 7c1fc2a039d2..852dc2a2a943 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -69,11 +69,10 @@ impl LimitedDistinctAggregation { let mut global_skip: usize = 0; let children: Vec>; let mut is_global_limit = false; - if let Some(local_limit) = plan.as_any().downcast_ref::() { + if let Some(local_limit) = plan.downcast_ref::() { limit = local_limit.fetch(); children = local_limit.children().into_iter().cloned().collect(); - } else if let Some(global_limit) = plan.as_any().downcast_ref::() - { + } else if let Some(global_limit) = plan.downcast_ref::() { global_fetch = global_limit.fetch(); global_fetch?; global_skip = global_limit.skip(); @@ -104,10 +103,9 @@ impl LimitedDistinctAggregation { if !rewrite_applicable { return Ok(Transformed::no(plan)); } - if let Some(aggr) = plan.as_any().downcast_ref::() { + if let Some(aggr) = plan.downcast_ref::() { if found_match_aggr - && let Some(parent_aggr) = - match_aggr.as_any().downcast_ref::() + && let Some(parent_aggr) = match_aggr.downcast_ref::() && !parent_aggr.group_expr().eq(aggr.group_expr()) { // a partial and final aggregation with different groupings disqualifies diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index f709c9e17ce0..81df6f943c15 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -198,10 +198,6 @@ impl ExecutionPlan for OutputRequirementExec { "OutputRequirementExec" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -340,9 +336,7 @@ impl PhysicalOptimizerRule for OutputRequirements { RuleMode::Add => require_top_ordering(plan), RuleMode::Remove => plan .transform_up(|plan| { - if let Some(sort_req) = - plan.as_any().downcast_ref::() - { + if let Some(sort_req) = plan.downcast_ref::() { Ok(Transformed::yes(sort_req.input())) } else { Ok(Transformed::no(plan)) @@ -389,7 +383,7 @@ fn require_top_ordering_helper( // Global ordering defines desired ordering in the final result. if children.len() != 1 { Ok((plan, false)) - } else if let Some(sort_exec) = plan.as_any().downcast_ref::() { + } else if let Some(sort_exec) = plan.downcast_ref::() { // In case of constant columns, output ordering of the `SortExec` would // be an empty set. Therefore; we check the sort expression field to // assign the requirements. @@ -407,7 +401,7 @@ fn require_top_ordering_helper( )) as _, true, )) - } else if let Some(spm) = plan.as_any().downcast_ref::() { + } else if let Some(spm) = plan.downcast_ref::() { let reqs = OrderingRequirements::from(spm.expr().clone()); let fetch = spm.fetch(); Ok(( diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 44d0926a8b25..7d8bf7aa4732 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -64,15 +64,13 @@ impl PhysicalOptimizerRule for ProjectionPushdown { ) -> Result> { let alias_generator = AliasGenerator::new(); let plan = plan - .transform_up(|plan| { - match plan.as_any().downcast_ref::() { - None => Ok(Transformed::no(plan)), - Some(hash_join) => try_push_down_join_filter( - Arc::clone(&plan), - hash_join, - &alias_generator, - ), - } + .transform_up(|plan| match plan.downcast_ref::() { + None => Ok(Transformed::no(plan)), + Some(hash_join) => try_push_down_join_filter( + Arc::clone(&plan), + hash_join, + &alias_generator, + ), }) .map(|t| t.data)?; diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 1fa15492d2a9..20c53ef31645 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -84,7 +84,7 @@ impl PhysicalOptimizerRule for PushdownSort { // Use transform_down to find and optimize all SortExec nodes (including nested ones) plan.transform_down(|plan: Arc| { // Check if this is a SortExec - let Some(sort_exec) = plan.as_any().downcast_ref::() else { + let Some(sort_exec) = plan.downcast_ref::() else { return Ok(Transformed::no(plan)); }; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 43e6dcbb4417..40c6245d894d 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -89,7 +89,7 @@ pub fn check_finiteness_requirements( input: &dyn ExecutionPlan, optimizer_options: &OptimizerOptions, ) -> Result<()> { - if let Some(exec) = input.as_any().downcast_ref::() + if let Some(exec) = input.downcast_ref::() && !(optimizer_options.allow_symmetric_joins_without_pruning || (exec.check_if_order_information_available()? && is_prunable(exec))) { diff --git a/datafusion/physical-optimizer/src/topk_aggregation.rs b/datafusion/physical-optimizer/src/topk_aggregation.rs index cec6bd70a208..581edd86cd0a 100644 --- a/datafusion/physical-optimizer/src/topk_aggregation.rs +++ b/datafusion/physical-optimizer/src/topk_aggregation.rs @@ -93,7 +93,7 @@ impl TopKAggregation { } fn transform_sort(plan: &Arc) -> Option> { - let sort = plan.as_any().downcast_ref::()?; + let sort = plan.downcast_ref::()?; let children = sort.children(); let child = children.into_iter().exactly_one().ok()?; @@ -109,13 +109,13 @@ impl TopKAggregation { if !cardinality_preserved { return Ok(Transformed::no(plan)); } - if let Some(aggr) = plan.as_any().downcast_ref::() { + if let Some(aggr) = plan.downcast_ref::() { // either we run into an Aggregate and transform it match Self::transform_agg(aggr, &cur_col_name, order_desc, limit) { None => cardinality_preserved = false, Some(plan) => return Ok(Transformed::yes(plan)), } - } else if let Some(proj) = plan.as_any().downcast_ref::() { + } else if let Some(proj) = plan.downcast_ref::() { // track renames due to successive projections for proj_expr in proj.expr() { let Some(src_col) = proj_expr.expr.as_any().downcast_ref::() diff --git a/datafusion/physical-optimizer/src/topk_repartition.rs b/datafusion/physical-optimizer/src/topk_repartition.rs index 9f9878012849..115bdc3cb535 100644 --- a/datafusion/physical-optimizer/src/topk_repartition.rs +++ b/datafusion/physical-optimizer/src/topk_repartition.rs @@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for TopKRepartition { } plan.transform_down(|node| { // Match SortExec with fetch (TopK) - let Some(sort_exec) = node.as_any().downcast_ref::() else { + let Some(sort_exec) = node.downcast_ref::() else { return Ok(Transformed::no(node)); }; let Some(fetch) = sort_exec.fetch() else { @@ -91,17 +91,17 @@ impl PhysicalOptimizerRule for TopKRepartition { // The child might be a CoalesceBatchesExec; look through it let sort_input = sort_exec.input(); - let sort_any = sort_input.as_any(); let (repart_parent, repart_exec) = if let Some(rp) = - sort_any.downcast_ref::() + sort_input.downcast_ref::() { // found a RepartitionExec, use it (None, rp) - } else if let Some(cb_exec) = sort_any.downcast_ref::() { + } else if let Some(cb_exec) = sort_input.downcast_ref::() + { // There's a CoalesceBatchesExec between TopK & RepartitionExec // in this case we will need to reconstruct both nodes let cb_input = cb_exec.input(); - let Some(rp) = cb_input.as_any().downcast_ref::() else { + let Some(rp) = cb_input.downcast_ref::() else { return Ok(Transformed::no(node)); }; (Some(Arc::clone(sort_input)), rp) @@ -133,7 +133,7 @@ impl PhysicalOptimizerRule for TopKRepartition { // Don't push if the input to the repartition is already bounded // (e.g., another TopK), as it would be redundant. let repart_input = repart_exec.input(); - if repart_input.as_any().downcast_ref::().is_some() { + if repart_input.is::() { return Ok(Transformed::no(node)); } diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index 67127c2a238f..2430918e2c2d 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -78,7 +78,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { _config: &ConfigOptions, ) -> Result> { plan.transform_up(|plan| { - if let Some(aggr_exec) = plan.as_any().downcast_ref::() { + if let Some(aggr_exec) = plan.downcast_ref::() { // Final stage implementations do not rely on ordering -- those // ordering fields may be pruned out by first stage aggregates. // Hence, necessary information for proper merge is added during diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 13a1745216e8..a6b01637c970 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -79,37 +79,37 @@ pub fn add_sort_above_with_check( /// Checks whether the given operator is a [`SortExec`]. pub fn is_sort(plan: &Arc) -> bool { - plan.as_any().is::() + plan.is::() } /// Checks whether the given operator is a window; /// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`]. pub fn is_window(plan: &Arc) -> bool { - plan.as_any().is::() || plan.as_any().is::() + plan.is::() || plan.is::() } /// Checks whether the given operator is a [`UnionExec`]. pub fn is_union(plan: &Arc) -> bool { - plan.as_any().is::() + plan.is::() } /// Checks whether the given operator is a [`SortPreservingMergeExec`]. pub fn is_sort_preserving_merge(plan: &Arc) -> bool { - plan.as_any().is::() + plan.is::() } /// Checks whether the given operator is a [`CoalescePartitionsExec`]. pub fn is_coalesce_partitions(plan: &Arc) -> bool { - plan.as_any().is::() + plan.is::() } /// Checks whether the given operator is a [`RepartitionExec`]. pub fn is_repartition(plan: &Arc) -> bool { - plan.as_any().is::() + plan.is::() } /// Checks whether the given operator is a limit; /// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`]. pub fn is_limit(plan: &Arc) -> bool { - plan.as_any().is::() || plan.as_any().is::() + plan.is::() || plan.is::() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index fb2d64da8354..24bf2265ff05 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -17,7 +17,6 @@ //! Aggregates functionalities -use std::any::Any; use std::sync::Arc; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; @@ -1342,10 +1341,6 @@ impl ExecutionPlan for AggregateExec { } /// Return a reference to Any that can be used for down-casting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -2503,10 +2498,6 @@ mod tests { "TestYieldingExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 845908fdd218..491a0872a2f9 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -17,7 +17,6 @@ //! Defines the ANALYZE operator -use std::any::Any; use std::sync::Arc; use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; @@ -138,10 +137,6 @@ impl ExecutionPlan for AnalyzeExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index abfe870f5266..23f2b9ba1ed5 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -37,7 +37,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use futures::Stream; use futures::stream::StreamExt; use log::trace; -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; @@ -158,10 +157,6 @@ impl ExecutionPlan for AsyncFuncExec { "async_func" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/buffer.rs b/datafusion/physical-plan/src/buffer.rs index 31e38419b32f..0cc4a1d71814 100644 --- a/datafusion/physical-plan/src/buffer.rs +++ b/datafusion/physical-plan/src/buffer.rs @@ -43,7 +43,6 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{Stream, StreamExt, TryStreamExt}; use pin_project_lite::pin_project; -use std::any::Any; use std::fmt; use std::pin::Pin; use std::sync::Arc; @@ -153,10 +152,6 @@ impl ExecutionPlan for BufferExec { "BufferExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 3e8bfc7f8172..2bf046f03b6c 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -17,7 +17,6 @@ //! [`CoalesceBatchesExec`] combines small batches into larger batches. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -168,10 +167,6 @@ impl ExecutionPlan for CoalesceBatchesExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 5ea3589f22b3..9290d725165e 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -18,7 +18,6 @@ //! Defines the merge plan for executing partitions in parallel and then merging the results //! into a single partition -use std::any::Any; use std::sync::Arc; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; @@ -143,10 +142,6 @@ impl ExecutionPlan for CoalescePartitionsExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index 129b30d4b419..fe6a3bc3d567 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -75,7 +75,6 @@ use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_physical_expr::PhysicalExpr; #[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -262,10 +261,6 @@ impl ExecutionPlan for CooperativeExec { "CooperativeExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> Arc { self.input.schema() } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 964eb152e809..756a68b1a958 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1199,10 +1199,6 @@ mod tests { "TestStatsExecPlan" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 078bc4b8d064..8103695ad08f 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -17,7 +17,6 @@ //! EmptyRelation with produce_one_row=false execution plan -use std::any::Any; use std::sync::Arc; use crate::memory::MemoryStream; @@ -112,10 +111,6 @@ impl ExecutionPlan for EmptyExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 117fd917396a..1a67ea0ded11 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -93,7 +93,7 @@ use futures::stream::{StreamExt, TryStreamExt}; /// /// [`datafusion-examples`]: https://github.com/apache/datafusion/tree/main/datafusion-examples /// [`memory_pool_execution_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs -pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { +pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { /// Short name for the ExecutionPlan, such as 'DataSourceExec'. /// /// Implementation note: this method can just proxy to @@ -117,10 +117,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } - /// Returns the execution plan as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { Arc::clone(self.properties().schema()) @@ -795,6 +791,26 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } +impl dyn ExecutionPlan { + /// Returns `true` if the plan is of type `T`. + /// + /// Prefer this over `downcast_ref::().is_some()`. Works correctly when + /// called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this plan to a concrete type `T`, returning `None` + /// if the plan is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// [`ExecutionPlan`] Invariant Level /// /// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan` @@ -1609,10 +1625,6 @@ mod tests { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { unimplemented!() } @@ -1682,10 +1694,6 @@ mod tests { "MyRenamedEmptyExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { unimplemented!() } @@ -1746,10 +1754,6 @@ mod tests { "MultiExprExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index fa684f3483a8..617a1a6cdaf5 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -17,7 +17,6 @@ //! Defines the EXPLAIN operator -use std::any::Any; use std::sync::Arc; use super::{DisplayAs, PlanProperties, SendableRecordBatchStream}; @@ -109,10 +108,6 @@ impl ExecutionPlan for ExplainExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 275fddd5e4d5..81088e5f89fd 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; @@ -474,10 +473,6 @@ impl ExecutionPlan for FilterExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index b64de91d9599..3027fb130f08 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -18,7 +18,7 @@ //! Defines the cross join plan for loading the left side of the cross join //! and producing batches in parallel for the right partitions -use std::{any::Any, sync::Arc, task::Poll}; +use std::{sync::Arc, task::Poll}; use super::utils::{ BatchSplitter, BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, @@ -271,10 +271,6 @@ impl ExecutionPlan for CrossJoinExec { "CrossJoinExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 8f0b5181e061..9667069a09a3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -20,7 +20,7 @@ use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; -use std::{any::Any, vec}; +use std::vec; use crate::ExecutionPlanProperties; use crate::execution_plan::{ @@ -1179,10 +1179,6 @@ impl ExecutionPlan for HashJoinExec { "HashJoinExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 1582556b01e1..cdfe3a33ecbe 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -17,7 +17,6 @@ //! [`NestedLoopJoinExec`]: joins without equijoin (equality predicates). -use std::any::Any; use std::fmt::Formatter; use std::ops::{BitOr, ControlFlow}; use std::sync::Arc; @@ -535,10 +534,6 @@ impl ExecutionPlan for NestedLoopJoinExec { "NestedLoopJoinExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index fb1c4b160528..2b20089f8e22 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -500,10 +500,6 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { "PiecewiseMergeJoinExec" } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index e2c169e5580d..3f309431614a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -19,7 +19,6 @@ //! A Sort-Merge join plan consumes two sorted children plans and produces //! joined output by given join type and other options. -use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; @@ -420,10 +419,6 @@ impl ExecutionPlan for SortMergeJoinExec { "SortMergeJoinExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index f31cd8d446de..dbfdf9442678 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -25,7 +25,6 @@ //! This plan uses the [`OneSideHashJoiner`] object to facilitate join calculations //! for both its children. -use std::any::Any; use std::fmt::{self, Debug}; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -423,10 +422,6 @@ impl ExecutionPlan for SymmetricHashJoinExec { "SymmetricHashJoinExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 5632ef234af6..51bef5d24bd2 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -17,7 +17,6 @@ //! Defines the LIMIT plan -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -155,10 +154,6 @@ impl ExecutionPlan for GlobalLimitExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -347,10 +342,6 @@ impl ExecutionPlan for LocalLimitExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index d607f2b440f6..3b9199058851 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -300,10 +300,6 @@ impl ExecutionPlan for LazyMemoryExec { "LazyMemoryExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index eaa895c82183..ae8e73cd74ad 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -17,7 +17,6 @@ //! EmptyRelation produce_one_row=true execution plan -use std::any::Any; use std::sync::Arc; use crate::coop::cooperative; @@ -130,10 +129,6 @@ impl ExecutionPlan for PlaceholderRowExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cd80277156fc..9cc75a68fefc 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -34,7 +34,6 @@ use crate::filter_pushdown::{ }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; -use std::any::Any; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -283,10 +282,6 @@ impl ExecutionPlan for ProjectionExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -732,20 +727,19 @@ pub fn try_pushdown_through_join( pub fn remove_unnecessary_projections( plan: Arc, ) -> Result>> { - let maybe_modified = - if let Some(projection) = plan.as_any().downcast_ref::() { - // If the projection does not cause any change on the input, we can - // safely remove it: - if is_projection_removable(projection) { - return Ok(Transformed::yes(Arc::clone(projection.input()))); - } - // If it does, check if we can push it under its child(ren): - projection - .input() - .try_swapping_with_projection(projection)? - } else { - return Ok(Transformed::no(plan)); - }; + let maybe_modified = if let Some(projection) = plan.downcast_ref::() { + // If the projection does not cause any change on the input, we can + // safely remove it: + if is_projection_removable(projection) { + return Ok(Transformed::yes(Arc::clone(projection.input()))); + } + // If it does, check if we can push it under its child(ren): + projection + .input() + .try_swapping_with_projection(projection)? + } else { + return Ok(Transformed::no(plan)); + }; Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes)) } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 049aa9563d52..35b787759441 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -141,10 +141,6 @@ impl ExecutionPlan for RecursiveQueryExec { "RecursiveQueryExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d3020c2756ff..8eefb2cbcdd7 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -23,7 +23,7 @@ use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, vec}; +use std::vec; use super::common::SharedMemoryReservation; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; @@ -902,10 +902,6 @@ impl ExecutionPlan for RepartitionExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 127998601fba..28b874523591 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -51,7 +51,6 @@ //! The plan concats incoming data with such last rows of previous input //! and continues partial sorting of the segments. -use std::any::Any; use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; @@ -260,10 +259,6 @@ impl ExecutionPlan for PartialSortExec { "PartialSortExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index bdf08823a29d..583bfa29b04a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -19,7 +19,6 @@ //! It will do in-memory sorting if it has enough memory budget //! but spills to disk if needed. -use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -1176,10 +1175,6 @@ impl ExecutionPlan for SortExec { } } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1247,7 +1242,6 @@ impl ExecutionPlan for SortExec { let children = self.children().into_iter().cloned().collect(); let new_sort = self.with_new_children(children)?; let mut new_sort = new_sort - .as_any() .downcast_ref::() .expect("cloned 1 lines above this line, we know the type") .clone(); @@ -1509,10 +1503,6 @@ mod tests { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 1e60c391f50d..b77cced35504 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -17,7 +17,6 @@ //! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream. -use std::any::Any; use std::sync::Arc; use crate::common::spawn_buffered; @@ -235,10 +234,6 @@ impl ExecutionPlan for SortPreservingMergeExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1418,9 +1413,6 @@ mod tests { fn name(&self) -> &'static str { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 5a1206629ac7..250eb59f19b8 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -17,7 +17,6 @@ //! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`] -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -234,10 +233,6 @@ impl ExecutionPlan for StreamingTableExec { "StreamingTableExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index 0630b8f17456..4c4724e4dcc4 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -17,7 +17,6 @@ //! Utilities for testing datafusion-physical-plan -use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::{Debug, Formatter}; @@ -133,10 +132,6 @@ impl ExecutionPlan for TestMemoryExec { "DataSourceExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 5458fa7ab826..200223b9b660 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -28,7 +28,6 @@ use crate::{ }; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ - any::Any, pin::Pin, sync::{Arc, Weak}, task::{Context, Poll}, @@ -189,10 +188,6 @@ impl ExecutionPlan for MockExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -426,10 +421,6 @@ impl ExecutionPlan for BarrierExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -569,10 +560,6 @@ impl ExecutionPlan for ErrorExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -666,10 +653,6 @@ impl ExecutionPlan for StatisticsExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -776,10 +759,6 @@ impl ExecutionPlan for BlockingExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -929,10 +908,6 @@ impl ExecutionPlan for PanicExec { Self::static_name() } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index eb16375a2d64..20295b7e6fac 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -23,8 +23,8 @@ use std::borrow::Borrow; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; -use std::{any::Any, sync::Arc}; use super::{ ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, @@ -225,10 +225,6 @@ impl ExecutionPlan for UnionExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -593,10 +589,6 @@ impl ExecutionPlan for InterleaveExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1411,7 +1403,6 @@ mod tests { // Downcast to verify it's a UnionExec let union = union_plan - .as_any() .downcast_ref::() .expect("Expected UnionExec"); @@ -1452,7 +1443,6 @@ mod tests { let union = UnionExec::try_new(vec![input1, input2])?; let union = union - .as_any() .downcast_ref::() .expect("expected UnionExec for multiple inputs"); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index a70b29212033..c774ff09af33 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -18,8 +18,8 @@ //! Define a plan for unnesting values in columns that contain a list type. use std::cmp::{self, Ordering}; +use std::sync::Arc; use std::task::{Poll, ready}; -use std::{any::Any, sync::Arc}; use super::metrics::{ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, @@ -230,10 +230,6 @@ impl ExecutionPlan for UnnestExec { "UnnestExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index d0c44c659c20..14f8ce5e95ff 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -20,7 +20,6 @@ //! the input data seen so far), which makes it appropriate when processing //! infinite inputs. -use std::any::Any; use std::cmp::{Ordering, min}; use std::collections::VecDeque; use std::pin::Pin; @@ -313,10 +312,6 @@ impl ExecutionPlan for BoundedWindowAggExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } @@ -1865,7 +1860,6 @@ mod tests { Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None)?); let plan = bounded_window_exec_pb_latent_range(input, 1, "hash", "sn")?; let plan = plan - .as_any() .downcast_ref::() .expect("expected BoundedWindowAggExec"); diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index c9958c875c6b..5098c8403406 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -17,7 +17,6 @@ //! Stream and channel implementations for window function expressions. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -214,10 +213,6 @@ impl ExecutionPlan for WindowAggExec { } /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index c2ef6bf071c4..0855dbf2fd63 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -178,10 +178,6 @@ impl ExecutionPlan for WorkTableExec { "WorkTableExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0f37e9ad3f94..4e37e9f9528e 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -333,7 +333,7 @@ impl protobuf::PhysicalPlanNode { Self: Sized, { let plan_clone = Arc::clone(&plan); - let plan = plan.as_any(); + let plan = plan.as_ref() as &dyn Any; if let Some(exec) = plan.downcast_ref::() { return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 10b50930b469..15639bcd25bd 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -969,7 +969,6 @@ fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() -> Re roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?; let data_source = roundtripped - .as_any() .downcast_ref::() .ok_or_else(|| { internal_datafusion_err!("Expected DataSourceExec after roundtrip") @@ -1280,7 +1279,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec) -> Result<()> { let binding = node.inner(); - if let Some(udf) = (binding.as_ref() as &dyn Any).downcast_ref::() { + if let Some(udf) = binding.downcast_ref::() { let proto = MyRegexUdfNode { pattern: udf.pattern.clone(), }; @@ -1307,8 +1306,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec) -> Result<()> { let binding = node.inner(); - if let Some(udf) = (binding.as_ref() as &dyn Any).downcast_ref::() - { + if let Some(udf) = binding.downcast_ref::() { let proto = MyAggregateUdfNode { result: udf.result.clone(), }; @@ -1335,7 +1333,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec) -> Result<()> { let binding = node.inner(); - if let Some(udwf) = (binding.as_ref() as &dyn Any).downcast_ref::() { + if let Some(udwf) = binding.downcast_ref::() { let proto = CustomUDWFNode { payload: udwf.payload.clone(), }; @@ -1657,10 +1655,7 @@ fn roundtrip_csv_sink() -> Result<()> { &proto_converter, )?; - let roundtrip_plan = roundtrip_plan - .as_any() - .downcast_ref::() - .unwrap(); + let roundtrip_plan = roundtrip_plan.downcast_ref::().unwrap(); let csv_sink = roundtrip_plan .sink() .as_any() @@ -2522,7 +2517,7 @@ fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> { // The deserialized plan should have lit(true) instead of HashTableLookupExpr // Verify the filter predicate is a Literal(true) - let result_filter = result.as_any().downcast_ref::().unwrap(); + let result_filter = result.downcast_ref::().unwrap(); let predicate = result_filter.predicate(); let literal = predicate.as_any().downcast_ref::().unwrap(); assert_eq!(*literal.value(), ScalarValue::Boolean(Some(true))); @@ -2819,7 +2814,6 @@ fn test_expression_deduplication_arc_sharing() -> Result<()> { // Get the projection from the result let projection = result_plan - .as_any() .downcast_ref::() .expect("Expected ProjectionExec"); @@ -2928,7 +2922,6 @@ fn test_deduplication_within_plan_deserialization() -> Result<()> { // Check that the plan was deserialized correctly with deduplication let projection1 = plan1 - .as_any() .downcast_ref::() .expect("Expected ProjectionExec"); let exprs1: Vec<_> = projection1.expr().iter().collect(); @@ -2948,7 +2941,6 @@ fn test_deduplication_within_plan_deserialization() -> Result<()> { // Check that the second plan was also deserialized correctly let projection2 = plan2 - .as_any() .downcast_ref::() .expect("Expected ProjectionExec"); let exprs2: Vec<_> = projection2.expr().iter().collect(); diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index 7a2da70352b0..17ca99ceff6e 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -51,7 +51,7 @@ pub fn to_substrait_rel( HashMap, ), ) -> Result> { - if let Some(data_source_exec) = plan.as_any().downcast_ref::() + if let Some(data_source_exec) = plan.downcast_ref::() && let Some((file_config, _)) = data_source_exec.downcast_to_file_source::() { diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 70b6be3ae2ab..9e5fd605d75c 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -100,10 +100,6 @@ impl ExecutionPlan for CustomExec { "CustomExec" } - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.projected_schema.clone() } @@ -231,10 +227,6 @@ The `scan` method of the `TableProvider` returns a `Result &dyn Any { -# self -# } -# # fn schema(&self) -> SchemaRef { # self.projected_schema.clone() # } @@ -431,10 +423,6 @@ This will allow you to use the custom table provider in DataFusion. For example, # "CustomExec" # } # -# fn as_any(&self) -> &dyn Any { -# self -# } -# # fn schema(&self) -> SchemaRef { # self.projected_schema.clone() # } diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index d8758e134248..7fb1c4ece79e 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -124,13 +124,13 @@ let mut stats = Arc::unwrap_or_clone(plan.partition_statistics(None)?); stats.column_statistics[0].min_value = ...; ``` -### Remove `as_any` from `ScalarUDFImpl`, `AggregateUDFImpl`, and `WindowUDFImpl` +### Remove `as_any` from `ScalarUDFImpl`, `AggregateUDFImpl`, `WindowUDFImpl`, and `ExecutionPlan` Now that we have a more recent minimum version of Rust, we can take advantage of -trait upcasting for UDFs. This reduces the amount of boilerplate code that -users need to do to create a UDF. In your implementations of `ScalarUDFImpl`, -`AggregateUDFImpl`, and `WindowUDFImpl`, you can simply remove the `as_any` -function. The below diffs are examples from the associated PRs. +trait upcasting. This reduces the amount of boilerplate code that +users need to implement. In your implementations of `ScalarUDFImpl`, +`AggregateUDFImpl`, `WindowUDFImpl`, and `ExecutionPlan`, you can simply remove +the `as_any` function. The below diffs are examples from the associated PRs. **Scalar UDFs:** @@ -180,29 +180,50 @@ function. The below diffs are examples from the associated PRs. } ``` -If you have a function that is downcasting a UDF, you can replace -the call to `.as_any()` with `.as_ref() as &dyn Any`. For example +**Execution Plans:** + +```diff + impl ExecutionPlan for MyExec { +- fn as_any(&self) -> &dyn Any { +- self +- } +- + fn name(&self) -> &'static str { + "MyExec" + } + + ... + } +``` + +If you have code that is downcasting, you can use the new `downcast_ref` +and `is` methods defined directly on each trait object: **Before:** ```rust,ignore -let is_async = func - .inner() - .as_any() - .downcast_ref::() - .is_some(); +let exec = plan.as_any().downcast_ref::().unwrap(); +let udf = scalar_udf.as_any().downcast_ref::().unwrap(); ``` **After:** ```rust,ignore -let is_async = (func - .inner() - .as_ref() as &dyn Any) - .downcast_ref::() - .is_some(); +let exec = plan.downcast_ref::().unwrap(); +let udf = scalar_udf.downcast_ref::().unwrap(); ``` +These methods are available on `dyn ExecutionPlan`, `dyn ScalarUDFImpl`, +`dyn AggregateUDFImpl`, and `dyn WindowUDFImpl`. They work correctly +whether the value is a bare reference or behind an `Arc` (Rust +auto-derefs through the `Arc`). + +> **Warning:** Do not cast an `Arc` directly to `&dyn Any`. +> Writing `(&plan as &dyn Any)` gives you an `Any` reference to the +> **`Arc` itself**, not the underlying trait object, so the downcast will +> always return `None`. Use the `downcast_ref` method above instead, or +> dereference through the `Arc` first with `plan.as_ref() as &dyn Any`. + ### Avro API and timestamp decoding changes DataFusion has switched to use `arrow-avro` (see [#17861]) when reading avro files