From 2ae8cd8a67af835295b1cecc28d7a76a466ef408 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 22 Apr 2022 15:07:31 +0100 Subject: [PATCH] Make ExecutionPlan sync (#2307) --- .../src/execution_plans/distributed_query.rs | 4 +- .../src/execution_plans/shuffle_reader.rs | 4 +- .../src/execution_plans/shuffle_writer.rs | 10 ++--- .../src/execution_plans/unresolved_shuffle.rs | 4 +- ballista/rust/core/src/serde/mod.rs | 3 +- ballista/rust/executor/src/collect.rs | 10 ++--- .../examples/custom_datasource.rs | 7 ++-- .../core/src/datasource/file_format/csv.rs | 2 +- .../core/src/datasource/file_format/json.rs | 2 +- .../src/datasource/file_format/parquet.rs | 2 +- datafusion/core/src/datasource/memory.rs | 10 ++--- .../aggregate_statistics.rs | 2 +- .../core/src/physical_plan/aggregates/mod.rs | 15 +++----- datafusion/core/src/physical_plan/analyze.rs | 6 +-- .../src/physical_plan/coalesce_batches.rs | 8 ++-- .../src/physical_plan/coalesce_partitions.rs | 9 ++--- datafusion/core/src/physical_plan/common.rs | 2 +- .../core/src/physical_plan/cross_join.rs | 7 ++-- datafusion/core/src/physical_plan/empty.rs | 12 +++--- datafusion/core/src/physical_plan/explain.rs | 4 +- .../src/physical_plan/file_format/avro.rs | 6 +-- .../core/src/physical_plan/file_format/csv.rs | 14 +++---- .../src/physical_plan/file_format/json.rs | 12 +++--- .../src/physical_plan/file_format/parquet.rs | 14 +++---- datafusion/core/src/physical_plan/filter.rs | 8 ++-- .../core/src/physical_plan/hash_join.rs | 34 ++++++++--------- datafusion/core/src/physical_plan/limit.rs | 13 +++---- datafusion/core/src/physical_plan/memory.rs | 8 ++-- datafusion/core/src/physical_plan/mod.rs | 10 ++--- datafusion/core/src/physical_plan/planner.rs | 4 +- .../core/src/physical_plan/projection.rs | 8 ++-- .../core/src/physical_plan/repartition.rs | 38 +++++++++---------- .../core/src/physical_plan/sort_merge_join.rs | 12 +++--- .../core/src/physical_plan/sorts/sort.rs | 5 +-- .../sorts/sort_preserving_merge.rs | 29 +++++--------- datafusion/core/src/physical_plan/union.rs | 6 +-- datafusion/core/src/physical_plan/values.rs | 4 +- .../physical_plan/windows/window_agg_exec.rs | 6 +-- datafusion/core/src/scheduler/mod.rs | 1 - .../core/src/scheduler/pipeline/execution.rs | 29 +++----------- datafusion/core/src/test/exec.rs | 16 +++----- datafusion/core/tests/custom_sources.rs | 9 ++++- .../core/tests/provider_filter_pushdown.rs | 3 +- datafusion/core/tests/statistics.rs | 3 +- datafusion/core/tests/user_defined_plan.rs | 4 +- 45 files changed, 160 insertions(+), 259 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs index ed3c9fceb01bd..0b20acb47969a 100644 --- a/ballista/rust/core/src/execution_plans/distributed_query.rs +++ b/ballista/rust/core/src/execution_plans/distributed_query.rs @@ -41,7 +41,6 @@ use datafusion::physical_plan::{ use crate::serde::protobuf::execute_query_params::OptionalSessionId; use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec}; -use async_trait::async_trait; use datafusion::arrow::error::{ArrowError, Result as ArrowResult}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::context::TaskContext; @@ -122,7 +121,6 @@ impl DistributedQueryExec { } } -#[async_trait] impl ExecutionPlan for DistributedQueryExec { fn as_any(&self) -> &dyn Any { self @@ -162,7 +160,7 @@ impl ExecutionPlan for DistributedQueryExec { })) } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index 27252b980d117..3046a2276f899 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use crate::client::BallistaClient; use crate::serde::scheduler::{PartitionLocation, PartitionStats}; -use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::{DataFusionError, Result}; @@ -64,7 +63,6 @@ impl ShuffleReaderExec { } } -#[async_trait] impl ExecutionPlan for ShuffleReaderExec { fn as_any(&self) -> &dyn Any { self @@ -101,7 +99,7 @@ impl ExecutionPlan for ShuffleReaderExec { )) } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index f5c98b2001153..45a102185c447 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -33,7 +33,6 @@ use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; use crate::serde::scheduler::PartitionStats; -use async_trait::async_trait; use datafusion::arrow::array::{ ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder, }; @@ -155,7 +154,7 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); - let mut stream = plan.execute(input_partition, context).await?; + let mut stream = plan.execute(input_partition, context)?; match output_partitioning { None => { @@ -293,7 +292,6 @@ impl ShuffleWriterExec { } } -#[async_trait] impl ExecutionPlan for ShuffleWriterExec { fn as_any(&self) -> &dyn Any { self @@ -336,7 +334,7 @@ impl ExecutionPlan for ShuffleWriterExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -459,7 +457,7 @@ mod tests { work_dir.into_path().to_str().unwrap().to_owned(), Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)), )?; - let mut stream = query_stage.execute(0, task_ctx).await?; + let mut stream = query_stage.execute(0, task_ctx)?; let batches = utils::collect_stream(&mut stream) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; @@ -516,7 +514,7 @@ mod tests { work_dir.into_path().to_str().unwrap().to_owned(), Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)), )?; - let mut stream = query_stage.execute(0, task_ctx).await?; + let mut stream = query_stage.execute(0, task_ctx)?; let batches = utils::collect_stream(&mut stream) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index e1eecdd92543f..15d403fb6a453 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -18,7 +18,6 @@ use std::any::Any; use std::sync::Arc; -use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; @@ -63,7 +62,6 @@ impl UnresolvedShuffleExec { } } -#[async_trait] impl ExecutionPlan for UnresolvedShuffleExec { fn as_any(&self) -> &dyn Any { self @@ -101,7 +99,7 @@ impl ExecutionPlan for UnresolvedShuffleExec { )) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index ed41ce61c4c46..bc2d7ff6b83ee 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -477,7 +477,6 @@ mod tests { } } - #[async_trait] impl ExecutionPlan for TopKExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -515,7 +514,7 @@ mod tests { } /// Execute one partition and return an iterator over RecordBatch - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs index 1bb4acaf8ee52..54e97550a68fc 100644 --- a/ballista/rust/executor/src/collect.rs +++ b/ballista/rust/executor/src/collect.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, pin::Pin}; -use async_trait::async_trait; use datafusion::arrow::{ datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch, }; @@ -49,7 +48,6 @@ impl CollectExec { } } -#[async_trait] impl ExecutionPlan for CollectExec { fn as_any(&self) -> &dyn Any { self @@ -78,7 +76,7 @@ impl ExecutionPlan for CollectExec { unimplemented!() } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -86,10 +84,8 @@ impl ExecutionPlan for CollectExec { assert_eq!(0, partition); let num_partitions = self.plan.output_partitioning().partition_count(); - let futures = (0..num_partitions).map(|i| self.plan.execute(i, context.clone())); - let streams = futures::future::join_all(futures) - .await - .into_iter() + let streams = (0..num_partitions) + .map(|i| self.plan.execute(i, context.clone())) .collect::>>() .map_err(|e| DataFusionError::Execution(format!("BallistaError: {:?}", e)))?; diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index a4b9fda1aa751..d8a908986839e 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -196,7 +196,6 @@ impl CustomExec { } } -#[async_trait] impl ExecutionPlan for CustomExec { fn as_any(&self) -> &dyn Any { self @@ -225,7 +224,7 @@ impl ExecutionPlan for CustomExec { Ok(self) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, @@ -243,7 +242,7 @@ impl ExecutionPlan for CustomExec { account_array.append_value(user.bank_account)?; } - return Ok(Box::pin(MemoryStream::try_new( + Ok(Box::pin(MemoryStream::try_new( vec![RecordBatch::try_new( self.projected_schema.clone(), vec![ @@ -253,7 +252,7 @@ impl ExecutionPlan for CustomExec { )?], self.schema(), None, - )?)); + )?)) } fn statistics(&self) -> Statistics { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 87bc6be1a5c27..cbc2adca1c4cd 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -161,7 +161,7 @@ mod tests { let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]); let exec = get_exec("aggregate_test_100.csv", &projection, None).await?; let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx).await?; + let stream = exec.execute(0, task_ctx)?; let tt_batches: i32 = stream .map(|batch| { diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index c56b843530c15..cd4fd5810f36e 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -131,7 +131,7 @@ mod tests { let projection = None; let exec = get_exec(&projection, None).await?; let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx).await?; + let stream = exec.execute(0, task_ctx)?; let tt_batches: i32 = stream .map(|batch| { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index daadff97d9ae1..a9a0c788b995f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -512,7 +512,7 @@ mod tests { let projection = None; let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx).await?; + let stream = exec.execute(0, task_ctx)?; let tt_batches = stream .map(|batch| { diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 90e429b187ca8..72630b39c675b 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -76,7 +76,7 @@ impl MemTable { let context1 = context.clone(); let exec = exec.clone(); tokio::spawn(async move { - let stream = exec.execute(part_i, context1.clone()).await?; + let stream = exec.execute(part_i, context1.clone())?; common::collect(stream).await }) }) @@ -103,7 +103,7 @@ impl MemTable { let mut output_partitions = vec![]; for i in 0..exec.output_partitioning().partition_count() { // execute this *output* partition and collect all batches - let mut stream = exec.execute(i, context.clone()).await?; + let mut stream = exec.execute(i, context.clone())?; let mut batches = vec![]; while let Some(result) = stream.next().await { batches.push(result?); @@ -177,7 +177,7 @@ mod tests { // scan with projection let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?; - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch2 = it.next().await.unwrap()?; assert_eq!(2, batch2.schema().fields().len()); assert_eq!("c", batch2.schema().field(0).name()); @@ -209,7 +209,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; let exec = provider.scan(&None, &[], None).await?; - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); assert_eq!(3, batch1.num_columns()); @@ -365,7 +365,7 @@ mod tests { MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; let exec = provider.scan(&None, &[], None).await?; - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); assert_eq!(3, batch1.num_columns()); diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 9c548ab9b5fb9..4cf96d2350ebb 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -304,7 +304,7 @@ mod tests { // A ProjectionExec is a sign that the count optimization was applied assert!(optimized.as_any().is::()); - let result = common::collect(optimized.execute(0, task_ctx).await?).await?; + let result = common::collect(optimized.execute(0, task_ctx)?).await?; assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col]))); assert_eq!( result[0] diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 5e1da793c3ea7..3682ec6eb8e99 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -29,7 +29,6 @@ use crate::physical_plan::{ }; use arrow::array::ArrayRef; use arrow::datatypes::{Field, Schema, SchemaRef}; -use async_trait::async_trait; use datafusion_common::Result; use datafusion_expr::Accumulator; use datafusion_physical_expr::expressions::Column; @@ -145,7 +144,6 @@ impl AggregateExec { } } -#[async_trait] impl ExecutionPlan for AggregateExec { /// Return a reference to Any that can be used for down-casting fn as_any(&self) -> &dyn Any { @@ -196,12 +194,12 @@ impl ExecutionPlan for AggregateExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, ) -> Result { - let input = self.input.execute(partition, context).await?; + let input = self.input.execute(partition, context)?; let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect(); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); @@ -417,7 +415,6 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; - use async_trait::async_trait; use datafusion_common::{DataFusionError, Result}; use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; use futures::{FutureExt, Stream}; @@ -489,8 +486,7 @@ mod tests { )?); let result = - common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?) - .await?; + common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?; let expected = vec![ "+---+---------------+-------------+", @@ -522,7 +518,7 @@ mod tests { )?); let result = - common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?; + common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?; assert_eq!(result.len(), 1); let batch = &result[0]; @@ -556,7 +552,6 @@ mod tests { pub yield_first: bool, } - #[async_trait] impl ExecutionPlan for TestYieldingExec { fn as_any(&self) -> &dyn Any { self @@ -587,7 +582,7 @@ mod tests { ))) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index f8050f16ce398..c2f08c69a4ba8 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -33,7 +33,6 @@ use futures::StreamExt; use super::expressions::PhysicalSortExpr; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; use crate::execution::context::TaskContext; -use async_trait::async_trait; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, /// discards the results, and then prints out an annotated plan with metrics @@ -58,7 +57,6 @@ impl AnalyzeExec { } } -#[async_trait] impl ExecutionPlan for AnalyzeExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -102,7 +100,7 @@ impl ExecutionPlan for AnalyzeExec { ))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -126,7 +124,7 @@ impl ExecutionPlan for AnalyzeExec { let (tx, rx) = tokio::sync::mpsc::channel(input_partitions); let captured_input = self.input.clone(); - let mut input_stream = captured_input.execute(0, context).await?; + let mut input_stream = captured_input.execute(0, context)?; let captured_schema = self.schema.clone(); let verbose = self.verbose; diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 482b0ee7799e2..75ecaf53eae91 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -34,7 +34,6 @@ use arrow::compute::kernels::concat::concat; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use log::debug; @@ -75,7 +74,6 @@ impl CoalesceBatchesExec { } } -#[async_trait] impl ExecutionPlan for CoalesceBatchesExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -116,13 +114,13 @@ impl ExecutionPlan for CoalesceBatchesExec { ))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, ) -> Result { Ok(Box::pin(CoalesceBatchesStream { - input: self.input.execute(partition, context).await?, + input: self.input.execute(partition, context)?, schema: self.input.schema(), target_batch_size: self.target_batch_size, buffer: Vec::new(), @@ -348,7 +346,7 @@ mod tests { for i in 0..output_partition_count { // execute this *output* partition and collect all batches let task_ctx = session_ctx.task_ctx(); - let mut stream = exec.execute(i, task_ctx.clone()).await?; + let mut stream = exec.execute(i, task_ctx.clone())?; let mut batches = vec![]; while let Some(result) = stream.next().await { batches.push(result?); diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 35f75db03c3bc..11fcd5d50be87 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -25,8 +25,6 @@ use std::task::Poll; use futures::Stream; use tokio::sync::mpsc; -use async_trait::async_trait; - use arrow::record_batch::RecordBatch; use arrow::{datatypes::SchemaRef, error::Result as ArrowResult}; @@ -66,7 +64,6 @@ impl CoalescePartitionsExec { } } -#[async_trait] impl ExecutionPlan for CoalescePartitionsExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -101,7 +98,7 @@ impl ExecutionPlan for CoalescePartitionsExec { Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -121,7 +118,7 @@ impl ExecutionPlan for CoalescePartitionsExec { )), 1 => { // bypass any threading / metrics if there is a single partition - self.input.execute(0, context).await + self.input.execute(0, context) } _ => { let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); @@ -252,7 +249,7 @@ mod tests { assert_eq!(merge.output_partitioning().partition_count(), 1); // the result should contain 4 batches (one per input partition) - let iter = merge.execute(0, task_ctx).await?; + let iter = merge.execute(0, task_ctx)?; let batches = common::collect(iter).await?; assert_eq!(batches.len(), num_partitions); diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 68bd676ddab9d..24df647dcde99 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -179,7 +179,7 @@ pub(crate) fn spawn_execution( context: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { - let mut stream = match input.execute(partition, context).await { + let mut stream = match input.execute(partition, context) { Err(e) => { // If send fails, plan being torn // down, no place to send the error diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/cross_join.rs index 2846af9c55c61..e3f25fc566abc 100644 --- a/datafusion/core/src/physical_plan/cross_join.rs +++ b/datafusion/core/src/physical_plan/cross_join.rs @@ -111,7 +111,7 @@ async fn load_left_input( // merge all left parts into a single stream let merge = CoalescePartitionsExec::new(left.clone()); - let stream = merge.execute(0, context).await?; + let stream = merge.execute(0, context)?; // Load all batches and count the rows let (batches, num_rows) = stream @@ -133,7 +133,6 @@ async fn load_left_input( Ok(merged_batch) } -#[async_trait] impl ExecutionPlan for CrossJoinExec { fn as_any(&self) -> &dyn Any { self @@ -169,12 +168,12 @@ impl ExecutionPlan for CrossJoinExec { false } - async fn execute( + fn execute( &self, partition: usize, context: Arc, ) -> Result { - let stream = self.right.execute(partition, context.clone()).await?; + let stream = self.right.execute(partition, context.clone())?; let left_fut = self .left_fut diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 43e749e1d7dcc..bba87e1906400 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -33,7 +33,6 @@ use super::expressions::PhysicalSortExpr; use super::{common, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; -use async_trait::async_trait; /// Execution plan for empty relation (produces no rows) #[derive(Debug)] @@ -76,7 +75,6 @@ impl EmptyExec { } } -#[async_trait] impl ExecutionPlan for EmptyExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -114,7 +112,7 @@ impl ExecutionPlan for EmptyExec { ))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -172,7 +170,7 @@ mod tests { assert_eq!(empty.schema(), schema); // we should have no results - let iter = empty.execute(0, task_ctx).await?; + let iter = empty.execute(0, task_ctx)?; let batches = common::collect(iter).await?; assert!(batches.is_empty()); @@ -208,8 +206,8 @@ mod tests { let empty = EmptyExec::new(false, schema); // ask for the wrong partition - assert!(empty.execute(1, task_ctx.clone()).await.is_err()); - assert!(empty.execute(20, task_ctx).await.is_err()); + assert!(empty.execute(1, task_ctx.clone()).is_err()); + assert!(empty.execute(20, task_ctx).is_err()); Ok(()) } @@ -220,7 +218,7 @@ mod tests { let schema = test_util::aggr_test_schema(); let empty = EmptyExec::new(true, schema); - let iter = empty.execute(0, task_ctx).await?; + let iter = empty.execute(0, task_ctx)?; let batches = common::collect(iter).await?; // should have one item diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index fb66315882087..fdc139a7e908c 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -34,7 +34,6 @@ use log::debug; use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream}; use crate::execution::context::TaskContext; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; -use async_trait::async_trait; /// Explain execution plan operator. This operator contains the string /// values of the various plans it has when it is created, and passes @@ -74,7 +73,6 @@ impl ExplainExec { } } -#[async_trait] impl ExecutionPlan for ExplainExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -110,7 +108,7 @@ impl ExecutionPlan for ExplainExec { Ok(self) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 68f8f2f905fc7..eed0161a2ea7d 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -28,7 +28,6 @@ use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use crate::execution::context::TaskContext; -use async_trait::async_trait; use std::any::Any; use std::sync::Arc; @@ -61,7 +60,6 @@ impl AvroExec { } } -#[async_trait] impl ExecutionPlan for AvroExec { fn as_any(&self) -> &dyn Any { self @@ -95,7 +93,7 @@ impl ExecutionPlan for AvroExec { } #[cfg(not(feature = "avro"))] - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, @@ -106,7 +104,7 @@ impl ExecutionPlan for AvroExec { } #[cfg(feature = "avro")] - async fn execute( + fn execute( &self, partition: usize, context: Arc, diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 8aea607ea46b8..96bceb2251f68 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -26,7 +26,6 @@ use crate::physical_plan::{ use arrow::csv; use arrow::datatypes::SchemaRef; -use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use std::any::Any; use std::fs; @@ -75,7 +74,6 @@ impl CsvExec { } } -#[async_trait] impl ExecutionPlan for CsvExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -112,7 +110,7 @@ impl ExecutionPlan for CsvExec { Ok(self) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -191,7 +189,7 @@ pub async fn plan_to_csv( let file = fs::File::create(path)?; let mut writer = csv::Writer::new(file); let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx).await?; + let stream = plan.execute(i, task_ctx)?; let handle: JoinHandle> = task::spawn(async move { stream .map(|batch| writer.write(&batch?)) @@ -250,7 +248,7 @@ mod tests { assert_eq!(3, csv.projected_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); - let mut stream = csv.execute(0, task_ctx).await?; + let mut stream = csv.execute(0, task_ctx)?; let batch = stream.next().await.unwrap()?; assert_eq!(3, batch.num_columns()); assert_eq!(100, batch.num_rows()); @@ -297,7 +295,7 @@ mod tests { assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.schema().fields().len()); - let mut it = csv.execute(0, task_ctx).await?; + let mut it = csv.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(13, batch.num_columns()); assert_eq!(5, batch.num_rows()); @@ -344,7 +342,7 @@ mod tests { assert_eq!(14, csv.projected_schema.fields().len()); assert_eq!(14, csv.schema().fields().len()); - let mut it = csv.execute(0, task_ctx).await?; + let mut it = csv.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(14, batch.num_columns()); assert_eq!(5, batch.num_rows()); @@ -398,7 +396,7 @@ mod tests { assert_eq!(2, csv.projected_schema.fields().len()); assert_eq!(2, csv.schema().fields().len()); - let mut it = csv.execute(0, task_ctx).await?; + let mut it = csv.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(2, batch.num_columns()); assert_eq!(100, batch.num_rows()); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 5c02a9c92d7a6..818496d130528 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -17,7 +17,6 @@ //! Execution plan for reading line-delimited JSON files use arrow::json::reader::DecoderOptions; -use async_trait::async_trait; use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; @@ -58,7 +57,6 @@ impl NdJsonExec { } } -#[async_trait] impl ExecutionPlan for NdJsonExec { fn as_any(&self) -> &dyn Any { self @@ -91,7 +89,7 @@ impl ExecutionPlan for NdJsonExec { Ok(self) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -168,7 +166,7 @@ pub async fn plan_to_json( let file = fs::File::create(path)?; let mut writer = json::LineDelimitedWriter::new(file); let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx).await?; + let stream = plan.execute(i, task_ctx)?; let handle: JoinHandle> = task::spawn(async move { stream .map(|batch| writer.write(batch?)) @@ -255,7 +253,7 @@ mod tests { &DataType::Utf8 ); - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(batch.num_rows(), 3); @@ -296,7 +294,7 @@ mod tests { table_partition_cols: vec![], }); - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(batch.num_rows(), 3); @@ -335,7 +333,7 @@ mod tests { inferred_schema.field_with_name("c").unwrap(); inferred_schema.field_with_name("d").unwrap_err(); - let mut it = exec.execute(0, task_ctx).await?; + let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; assert_eq!(batch.num_rows(), 4); diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index df7327aa0aa47..d2e156f32ed95 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -33,7 +33,6 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use async_trait::async_trait; use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::{ @@ -165,7 +164,6 @@ impl ParquetFileMetrics { } } -#[async_trait] impl ExecutionPlan for ParquetExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -201,7 +199,7 @@ impl ExecutionPlan for ParquetExec { Ok(self) } - async fn execute( + fn execute( &self, partition_index: usize, context: Arc, @@ -592,7 +590,7 @@ pub async fn plan_to_parquet( writer_properties.clone(), )?; let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx).await?; + let stream = plan.execute(i, task_ctx)?; let handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { stream @@ -1059,7 +1057,7 @@ mod tests { ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let mut results = parquet_exec.execute(0, task_ctx).await?; + let mut results = parquet_exec.execute(0, task_ctx)?; let batch = results.next().await.unwrap()?; assert_eq!(8, batch.num_rows()); @@ -1111,7 +1109,7 @@ mod tests { None, ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let results = parquet_exec.execute(0, task_ctx).await?.next().await; + let results = parquet_exec.execute(0, task_ctx)?.next().await; if let Some(expected_row_num) = expected_row_num { let batch = results.unwrap()?; @@ -1190,7 +1188,7 @@ mod tests { ); assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let mut results = parquet_exec.execute(0, task_ctx).await?; + let mut results = parquet_exec.execute(0, task_ctx)?; let batch = results.next().await.unwrap()?; let expected = vec![ "+----+----------+-------------+-------+", @@ -1247,7 +1245,7 @@ mod tests { None, ); - let mut results = parquet_exec.execute(0, task_ctx).await?; + let mut results = parquet_exec.execute(0, task_ctx)?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect assert_contains!( diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 158bedf2849dc..4aa6453bfb73c 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -36,7 +36,6 @@ use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use log::debug; use crate::execution::context::TaskContext; @@ -84,7 +83,6 @@ impl FilterExec { } } -#[async_trait] impl ExecutionPlan for FilterExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -129,7 +127,7 @@ impl ExecutionPlan for FilterExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -137,9 +135,9 @@ impl ExecutionPlan for FilterExec { debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { - schema: self.input.schema().clone(), + schema: self.input.schema(), predicate: self.predicate.clone(), - input: self.input.execute(partition, context).await?, + input: self.input.execute(partition, context)?, baseline_metrics, })) } diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index 31882c63ce95f..8a4a342c11f69 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -34,7 +34,6 @@ use std::sync::Arc; use std::{any::Any, usize}; use std::{time::Instant, vec}; -use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; use arrow::array::{new_null_array, Array}; @@ -250,7 +249,6 @@ impl HashJoinExec { } } -#[async_trait] impl ExecutionPlan for HashJoinExec { fn as_any(&self) -> &dyn Any { self @@ -290,7 +288,7 @@ impl ExecutionPlan for HashJoinExec { false } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -318,7 +316,7 @@ impl ExecutionPlan for HashJoinExec { // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. - let right_stream = self.right.execute(partition, context).await?; + let right_stream = self.right.execute(partition, context)?; Ok(Box::pin(HashJoinStream { schema: self.schema(), @@ -375,7 +373,7 @@ async fn collect_left_input( // merge all left parts into a single stream let merge = CoalescePartitionsExec::new(left); - let stream = merge.execute(0, context).await?; + let stream = merge.execute(0, context)?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream @@ -430,7 +428,7 @@ async fn partitioned_left_input( let start = Instant::now(); // Load 1 partition of left side in memory - let stream = left.execute(partition, context.clone()).await?; + let stream = left.execute(partition, context.clone())?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream @@ -1122,7 +1120,7 @@ mod tests { let join = join(left, right, on, join_type, null_equals_null)?; let columns = columns(&join.schema()); - let stream = join.execute(0, context).await?; + let stream = join.execute(0, context)?; let batches = common::collect(stream).await?; Ok((columns, batches)) @@ -1167,7 +1165,7 @@ mod tests { let mut batches = vec![]; for i in 0..partition_count { - let stream = join.execute(i, context.clone()).await?; + let stream = join.execute(i, context.clone())?; let more_batches = common::collect(stream).await?; batches.extend( more_batches @@ -1446,7 +1444,7 @@ mod tests { assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); // first part - let stream = join.execute(0, task_ctx.clone()).await?; + let stream = join.execute(0, task_ctx.clone())?; let batches = common::collect(stream).await?; assert_eq!(batches.len(), 1); @@ -1460,7 +1458,7 @@ mod tests { assert_batches_sorted_eq!(expected, &batches); // second part - let stream = join.execute(1, task_ctx.clone()).await?; + let stream = join.execute(1, task_ctx.clone())?; let batches = common::collect(stream).await?; assert_eq!(batches.len(), 1); let expected = vec![ @@ -1513,7 +1511,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let stream = join.execute(0, task_ctx).await.unwrap(); + let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); let expected = vec![ @@ -1556,7 +1554,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let stream = join.execute(0, task_ctx).await.unwrap(); + let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); let expected = vec![ @@ -1597,7 +1595,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let stream = join.execute(0, task_ctx).await.unwrap(); + let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); let expected = vec![ @@ -1634,7 +1632,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let stream = join.execute(0, task_ctx).await.unwrap(); + let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); let expected = vec![ @@ -1762,7 +1760,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1"]); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; let expected = vec![ @@ -1803,7 +1801,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1"]); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; let expected = vec![ @@ -1921,7 +1919,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; let expected = vec![ @@ -2014,7 +2012,7 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a", "b", "c", "a", "b", "c"]); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; let expected = vec![ diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 71e08afd26122..00516db0bf751 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -43,7 +43,6 @@ use super::{ }; use crate::execution::context::TaskContext; -use async_trait::async_trait; /// Limit execution plan #[derive(Debug)] @@ -77,7 +76,6 @@ impl GlobalLimitExec { } } -#[async_trait] impl ExecutionPlan for GlobalLimitExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -127,7 +125,7 @@ impl ExecutionPlan for GlobalLimitExec { ))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -152,7 +150,7 @@ impl ExecutionPlan for GlobalLimitExec { } let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let stream = self.input.execute(0, context).await?; + let stream = self.input.execute(0, context)?; Ok(Box::pin(LimitStream::new( stream, self.limit, @@ -230,7 +228,6 @@ impl LocalLimitExec { } } -#[async_trait] impl ExecutionPlan for LocalLimitExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -282,14 +279,14 @@ impl ExecutionPlan for LocalLimitExec { } } - async fn execute( + fn execute( &self, partition: usize, context: Arc, ) -> Result { debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let stream = self.input.execute(partition, context).await?; + let stream = self.input.execute(partition, context)?; Ok(Box::pin(LimitStream::new( stream, self.limit, @@ -467,7 +464,7 @@ mod tests { GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); // the result should contain 4 batches (one per input partition) - let iter = limit.execute(0, task_ctx).await?; + let iter = limit.execute(0, task_ctx)?; let batches = common::collect(iter).await?; // there should be a total of 100 rows diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index 2862aefdb515b..6a0af7650465c 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -33,7 +33,6 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; -use async_trait::async_trait; use datafusion_common::DataFusionError; use futures::Stream; @@ -57,7 +56,6 @@ impl fmt::Debug for MemoryExec { } } -#[async_trait] impl ExecutionPlan for MemoryExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -97,7 +95,7 @@ impl ExecutionPlan for MemoryExec { ))) } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, @@ -279,7 +277,7 @@ mod tests { ); // scan with projection - let mut it = executor.execute(0, task_ctx).await?; + let mut it = executor.execute(0, task_ctx)?; let batch2 = it.next().await.unwrap()?; assert_eq!(2, batch2.schema().fields().len()); assert_eq!("c", batch2.schema().field(0).name()); @@ -329,7 +327,7 @@ mod tests { ]) ); - let mut it = executor.execute(0, task_ctx).await?; + let mut it = executor.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; assert_eq!(4, batch1.schema().fields().len()); assert_eq!(4, batch1.num_columns()); diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index dc963c7e1bdc1..feb0bf3226192 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -29,7 +29,6 @@ use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; pub use display::DisplayFormatType; @@ -128,7 +127,6 @@ pub struct ColumnStatistics { /// [`ExecutionPlan`] can be displayed in an simplified form using the /// return value from [`displayable`] in addition to the (normally /// quite verbose) `Debug` output. -#[async_trait] pub trait ExecutionPlan: Debug + Send + Sync { /// Returns the execution plan as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. @@ -223,7 +221,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { ) -> Result>; /// creates an iterator - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -426,13 +424,13 @@ pub async fn execute_stream( ) -> Result { match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), - 1 => plan.execute(0, context).await, + 1 => plan.execute(0, context), _ => { // merge into a single partition let plan = CoalescePartitionsExec::new(plan.clone()); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); - plan.execute(0, context).await + plan.execute(0, context) } } } @@ -458,7 +456,7 @@ pub async fn execute_stream_partitioned( let num_partitions = plan.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(num_partitions); for i in 0..num_partitions { - streams.push(plan.execute(i, context.clone()).await?); + streams.push(plan.execute(i, context.clone())?); } Ok(streams) } diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index b58029eb5916b..85fb7d424fac4 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1510,7 +1510,6 @@ mod tests { logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream, }; use arrow::datatypes::{DataType, Field, SchemaRef}; - use async_trait::async_trait; use datafusion_common::{DFField, DFSchema, DFSchemaRef}; use datafusion_expr::sum; use datafusion_expr::{col, lit}; @@ -2000,7 +1999,6 @@ mod tests { schema: SchemaRef, } - #[async_trait] impl ExecutionPlan for NoOpExecutionPlan { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -2034,7 +2032,7 @@ mod tests { unimplemented!("NoOpExecutionPlan::with_new_children"); } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 554dda12c34a4..8e8a1ee54d3c9 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -39,7 +39,6 @@ use super::expressions::{Column, PhysicalSortExpr}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; -use async_trait::async_trait; use futures::stream::Stream; use futures::stream::StreamExt; @@ -102,7 +101,6 @@ impl ProjectionExec { } } -#[async_trait] impl ExecutionPlan for ProjectionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -146,7 +144,7 @@ impl ExecutionPlan for ProjectionExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -155,7 +153,7 @@ impl ExecutionPlan for ProjectionExec { Ok(Box::pin(ProjectionStream { schema: self.schema.clone(), expr: self.expr.iter().map(|x| x.0.clone()).collect(), - input: self.input.execute(partition, context).await?, + input: self.input.execute(partition, context)?, baseline_metrics: BaselineMetrics::new(&self.metrics, partition), })) } @@ -345,7 +343,7 @@ mod tests { let mut row_count = 0; for partition in 0..projection.output_partitioning().partition_count() { partition_count += 1; - let stream = projection.execute(partition, task_ctx.clone()).await?; + let stream = projection.execute(partition, task_ctx.clone())?; row_count += stream .map(|batch| { diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 37955539639fb..82efe2c4fdb46 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -37,17 +37,14 @@ use super::common::{AbortOnDropMany, AbortOnDropSingle}; use super::expressions::PhysicalSortExpr; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream}; -use async_trait::async_trait; use crate::execution::context::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::stream::Stream; use futures::StreamExt; use hashbrown::HashMap; -use tokio::sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, - Mutex, -}; +use parking_lot::Mutex; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; type MaybeBatch = Option>; @@ -261,7 +258,6 @@ impl RepartitionExec { } } -#[async_trait] impl ExecutionPlan for RepartitionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -299,7 +295,7 @@ impl ExecutionPlan for RepartitionExec { None } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -309,7 +305,7 @@ impl ExecutionPlan for RepartitionExec { partition ); // lock mutexes - let mut state = self.state.lock().await; + let mut state = self.state.lock(); let num_input_partitions = self.input.output_partitioning().partition_count(); let num_output_partitions = self.partitioning.partition_count(); @@ -437,7 +433,7 @@ impl RepartitionExec { // execute the child operator let timer = r_metrics.fetch_time.timer(); - let mut stream = input.execute(i, context).await?; + let mut stream = input.execute(i, context)?; timer.done(); // While there are still outputs to send to, keep @@ -689,7 +685,7 @@ mod tests { let mut output_partitions = vec![]; for i in 0..exec.partitioning.partition_count() { // execute this *output* partition and collect all batches - let mut stream = exec.execute(i, task_ctx.clone()).await?; + let mut stream = exec.execute(i, task_ctx.clone())?; let mut batches = vec![]; while let Some(result) = stream.next().await { batches.push(result?); @@ -745,7 +741,7 @@ mod tests { // returned and no results produced let partitioning = Partitioning::UnknownPartitioning(1); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); - let output_stream = exec.execute(0, task_ctx).await.unwrap(); + let output_stream = exec.execute(0, task_ctx).unwrap(); // Expect that an error is returned let result_string = crate::physical_plan::common::collect(output_stream) @@ -773,7 +769,7 @@ mod tests { // Note: this should pass (the stream can be created) but the // error when the input is executed should get passed back - let output_stream = exec.execute(0, task_ctx).await.unwrap(); + let output_stream = exec.execute(0, task_ctx).unwrap(); // Expect that an error is returned let result_string = crate::physical_plan::common::collect(output_stream) @@ -808,7 +804,7 @@ mod tests { // Note: this should pass (the stream can be created) but the // error when the input is executed should get passed back - let output_stream = exec.execute(0, task_ctx).await.unwrap(); + let output_stream = exec.execute(0, task_ctx).unwrap(); // Expect that an error is returned let result_string = crate::physical_plan::common::collect(output_stream) @@ -860,7 +856,7 @@ mod tests { assert_batches_sorted_eq!(&expected, &expected_batches); - let output_stream = exec.execute(0, task_ctx).await.unwrap(); + let output_stream = exec.execute(0, task_ctx).unwrap(); let batches = crate::physical_plan::common::collect(output_stream) .await .unwrap(); @@ -880,8 +876,8 @@ mod tests { // partition into two output streams let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap(); + let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); + let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); // now, purposely drop output stream 0 // *before* any outputs are produced @@ -927,7 +923,7 @@ mod tests { // We first collect the results without droping the output stream. let input = Arc::new(make_barrier_exec()); let exec = RepartitionExec::try_new(input.clone(), partitioning.clone()).unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap(); + let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); input.wait().await; let batches_without_drop = crate::physical_plan::common::collect(output_stream1) .await @@ -947,8 +943,8 @@ mod tests { // Now do the same but dropping the stream before waiting for the barrier let input = Arc::new(make_barrier_exec()); let exec = RepartitionExec::try_new(input.clone(), partitioning).unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap(); + let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); + let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); // now, purposely drop output stream 0 // *before* any outputs are produced std::mem::drop(output_stream0); @@ -1053,11 +1049,11 @@ mod tests { let schema = batch.schema(); let input = MockExec::new(vec![Ok(batch)], schema); let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); - let output_stream0 = exec.execute(0, task_ctx.clone()).await.unwrap(); + let output_stream0 = exec.execute(0, task_ctx.clone()).unwrap(); let batch0 = crate::physical_plan::common::collect(output_stream0) .await .unwrap(); - let output_stream1 = exec.execute(1, task_ctx.clone()).await.unwrap(); + let output_stream1 = exec.execute(1, task_ctx.clone()).unwrap(); let batch1 = crate::physical_plan::common::collect(output_stream1) .await .unwrap(); diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/sort_merge_join.rs index 765f3ba25d3fc..c207917b6b76a 100644 --- a/datafusion/core/src/physical_plan/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/sort_merge_join.rs @@ -32,7 +32,6 @@ use arrow::compute::{take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use futures::{Stream, StreamExt}; use crate::error::DataFusionError; @@ -112,7 +111,6 @@ impl SortMergeJoinExec { } } -#[async_trait] impl ExecutionPlan for SortMergeJoinExec { fn as_any(&self) -> &dyn Any { self @@ -153,7 +151,7 @@ impl ExecutionPlan for SortMergeJoinExec { } } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -178,8 +176,8 @@ impl ExecutionPlan for SortMergeJoinExec { }; // execute children plans - let streamed = streamed.execute(partition, context.clone()).await?; - let buffered = buffered.execute(partition, context.clone()).await?; + let streamed = streamed.execute(partition, context.clone())?; + let buffered = buffered.execute(partition, context.clone())?; // create output buffer let batch_size = context.session_config().batch_size; @@ -1357,7 +1355,7 @@ mod tests { )?; let columns = columns(&join.schema()); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; Ok((columns, batches)) } @@ -1374,7 +1372,7 @@ mod tests { let join = join(left, right, on, join_type)?; let columns = columns(&join.schema()); - let stream = join.execute(0, task_ctx).await?; + let stream = join.execute(0, task_ctx)?; let batches = common::collect(stream).await?; Ok((columns, batches)) } diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index d11a0cf299053..3a0b5f2a16dcf 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -694,7 +694,6 @@ impl SortExec { } } -#[async_trait] impl ExecutionPlan for SortExec { fn as_any(&self) -> &dyn Any { self @@ -748,7 +747,7 @@ impl ExecutionPlan for SortExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -775,7 +774,7 @@ impl ExecutionPlan for SortExec { partition ); - let input = self.input.execute(partition, context.clone()).await?; + let input = self.input.execute(partition, context.clone())?; debug!("End SortExec's input.execute for partition: {}", partition); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 37ed4acb81ab3..8e3326255cff9 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -35,7 +35,6 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use async_trait::async_trait; use futures::stream::{Fuse, FusedStream}; use futures::{Stream, StreamExt}; use tokio::sync::mpsc; @@ -108,7 +107,6 @@ impl SortPreservingMergeExec { } } -#[async_trait] impl ExecutionPlan for SortPreservingMergeExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -150,7 +148,7 @@ impl ExecutionPlan for SortPreservingMergeExec { ))) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -182,7 +180,7 @@ impl ExecutionPlan for SortPreservingMergeExec { )), 1 => { // bypass if there is only one partition to merge (no metrics in this case either) - let result = self.input.execute(0, context).await; + let result = self.input.execute(0, context); debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input"); result } @@ -210,20 +208,13 @@ impl ExecutionPlan for SortPreservingMergeExec { ) }) .collect(), - Err(_) => { - futures::future::try_join_all((0..input_partitions).map( - |partition| { - let context = context.clone(); - async move { - self.input - .execute(partition, context) - .await - .map(|stream| SortedStream::new(stream, 0)) - } - }, - )) - .await? - } + Err(_) => (0..input_partitions) + .map(|partition| { + let stream = + self.input.execute(partition, context.clone())?; + Ok(SortedStream::new(stream, 0)) + }) + .collect::>()?, }; debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute"); @@ -1209,7 +1200,7 @@ mod tests { for partition in 0..partition_count { let (sender, receiver) = mpsc::channel(1); - let mut stream = batches.execute(partition, task_ctx.clone()).await.unwrap(); + let mut stream = batches.execute(partition, task_ctx.clone()).unwrap(); let join_handle = tokio::spawn(async move { while let Some(batch) = stream.next().await { sender.send(batch).await.unwrap(); diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index b794cad26ac4e..e89ac4a76d0ee 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -38,7 +38,6 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; -use async_trait::async_trait; /// UNION ALL execution plan #[derive(Debug)] @@ -64,7 +63,6 @@ impl UnionExec { } } -#[async_trait] impl ExecutionPlan for UnionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -107,7 +105,7 @@ impl ExecutionPlan for UnionExec { Ok(Arc::new(UnionExec::new(children))) } - async fn execute( + fn execute( &self, mut partition: usize, context: Arc, @@ -123,7 +121,7 @@ impl ExecutionPlan for UnionExec { for input in self.inputs.iter() { // Calculate whether partition belongs to the current partition if partition < input.output_partitioning().partition_count() { - let stream = input.execute(partition, context.clone()).await?; + let stream = input.execute(partition, context)?; debug!("Found a Union partition to execute"); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); } else { diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index f2ba681ed3bda..897936814ceea 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -29,7 +29,6 @@ use crate::scalar::ScalarValue; use arrow::array::new_null_array; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use async_trait::async_trait; use std::any::Any; use std::sync::Arc; @@ -96,7 +95,6 @@ impl ValuesExec { } } -#[async_trait] impl ExecutionPlan for ValuesExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -138,7 +136,7 @@ impl ExecutionPlan for ValuesExec { })) } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 8b545a12bb610..e9eac35a3d883 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -33,7 +33,6 @@ use arrow::{ error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use async_trait::async_trait; use futures::stream::Stream; use futures::{ready, StreamExt}; use std::any::Any; @@ -90,7 +89,6 @@ impl WindowAggExec { } } -#[async_trait] impl ExecutionPlan for WindowAggExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -148,12 +146,12 @@ impl ExecutionPlan for WindowAggExec { )?)) } - async fn execute( + fn execute( &self, partition: usize, context: Arc, ) -> Result { - let input = self.input.execute(partition, context).await?; + let input = self.input.execute(partition, context)?; let stream = Box::pin(WindowAggStream::new( self.schema.clone(), self.window_expr.clone(), diff --git a/datafusion/core/src/scheduler/mod.rs b/datafusion/core/src/scheduler/mod.rs index a765ddf833844..4cdc34b728e45 100644 --- a/datafusion/core/src/scheduler/mod.rs +++ b/datafusion/core/src/scheduler/mod.rs @@ -76,7 +76,6 @@ use std::sync::Arc; -use futures::stream::BoxStream; use log::{debug, error}; use crate::error::Result; diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index baf487d98c807..20e7c6e79a48c 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -22,9 +22,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; -use arrow::error::ArrowError; -use async_trait::async_trait; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt}; use parking_lot::Mutex; use crate::arrow::datatypes::SchemaRef; @@ -39,7 +37,6 @@ use crate::physical_plan::{ }; use crate::scheduler::pipeline::Pipeline; -use crate::scheduler::BoxStream; /// An [`ExecutionPipeline`] wraps a portion of an [`ExecutionPlan`] and /// converts it to the push-based [`Pipeline`] interface @@ -57,7 +54,7 @@ use crate::scheduler::BoxStream; pub struct ExecutionPipeline { proxied: Arc, inputs: Vec>>>, - outputs: Vec>>>, + outputs: Vec>, } impl std::fmt::Debug for ExecutionPipeline { @@ -125,23 +122,8 @@ impl ExecutionPipeline { // Construct the output streams let output_count = proxied.output_partitioning().partition_count(); let outputs = (0..output_count) - .map(|x| { - let proxy_captured = proxied.clone(); - let task_captured = task_context.clone(); - let fut = async move { - proxy_captured - .execute(x, task_captured) - .await - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - }; - - // Use futures::stream::once to handle operators that perform computation - // within `ExecutionPlan::execute`. If we evaluated these futures here - // we could potentially block indefinitely waiting for inputs that will - // never arrive as the query isn't scheduled yet - Mutex::new(futures::stream::once(fut).try_flatten().boxed()) - }) - .collect(); + .map(|x| proxied.execute(x, task_context.clone()).map(Mutex::new)) + .collect::>()?; Ok(Self { proxied, @@ -236,7 +218,6 @@ struct ProxyExecutionPlan { inputs: Vec>>, } -#[async_trait] impl ExecutionPlan for ProxyExecutionPlan { fn as_any(&self) -> &dyn Any { self @@ -281,7 +262,7 @@ impl ExecutionPlan for ProxyExecutionPlan { unimplemented!() } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index 7e0cbe35f824c..855bb3bbc11e7 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -17,7 +17,6 @@ //! Simple iterator over batches for use in testing -use async_trait::async_trait; use std::{ any::Any, pin::Pin, @@ -138,7 +137,6 @@ impl MockExec { } } -#[async_trait] impl ExecutionPlan for MockExec { fn as_any(&self) -> &dyn Any { self @@ -168,7 +166,7 @@ impl ExecutionPlan for MockExec { } /// Returns a stream which yields data - async fn execute( + fn execute( &self, partition: usize, _context: Arc, @@ -277,7 +275,6 @@ impl BarrierExec { } } -#[async_trait] impl ExecutionPlan for BarrierExec { fn as_any(&self) -> &dyn Any { self @@ -307,7 +304,7 @@ impl ExecutionPlan for BarrierExec { } /// Returns a stream which yields data - async fn execute( + fn execute( &self, partition: usize, _context: Arc, @@ -378,7 +375,6 @@ impl ErrorExec { } } -#[async_trait] impl ExecutionPlan for ErrorExec { fn as_any(&self) -> &dyn Any { self @@ -408,7 +404,7 @@ impl ExecutionPlan for ErrorExec { } /// Returns a stream which yields data - async fn execute( + fn execute( &self, partition: usize, _context: Arc, @@ -458,7 +454,6 @@ impl StatisticsExec { } } } -#[async_trait] impl ExecutionPlan for StatisticsExec { fn as_any(&self) -> &dyn Any { self @@ -487,7 +482,7 @@ impl ExecutionPlan for StatisticsExec { Ok(self) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, @@ -552,7 +547,6 @@ impl BlockingExec { } } -#[async_trait] impl ExecutionPlan for BlockingExec { fn as_any(&self) -> &dyn Any { self @@ -585,7 +579,7 @@ impl ExecutionPlan for BlockingExec { ))) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index dbaaca206433e..81e4706deb655 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -98,31 +98,36 @@ impl Stream for TestCustomRecordBatchStream { } } -#[async_trait] impl ExecutionPlan for CustomExecutionPlan { fn as_any(&self) -> &dyn Any { self } + fn schema(&self) -> SchemaRef { let schema = TEST_CUSTOM_SCHEMA_REF!(); project_schema(&schema, self.projection.as_ref()).expect("projected schema") } + fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } + fn children(&self) -> Vec> { vec![] } + fn with_new_children( self: Arc, _: Vec>, ) -> Result> { Ok(self) } - async fn execute( + + fn execute( &self, _partition: usize, _context: Arc, diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index 664e77e182e20..49cd70143b99b 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -55,7 +55,6 @@ struct CustomPlan { batches: Vec>, } -#[async_trait] impl ExecutionPlan for CustomPlan { fn as_any(&self) -> &dyn std::any::Any { self @@ -84,7 +83,7 @@ impl ExecutionPlan for CustomPlan { unreachable!() } - async fn execute( + fn execute( &self, partition: usize, _context: Arc, diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs index d57c218d4864b..0315067047903 100644 --- a/datafusion/core/tests/statistics.rs +++ b/datafusion/core/tests/statistics.rs @@ -106,7 +106,6 @@ impl TableProvider for StatisticsValidation { } } -#[async_trait] impl ExecutionPlan for StatisticsValidation { fn as_any(&self) -> &dyn Any { self @@ -135,7 +134,7 @@ impl ExecutionPlan for StatisticsValidation { Ok(self) } - async fn execute( + fn execute( &self, _partition: usize, _context: Arc, diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 43e6eeacdb104..d062cf3a37583 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -457,7 +457,7 @@ impl ExecutionPlan for TopKExec { } /// Execute one partition and return an iterator over RecordBatch - async fn execute( + fn execute( &self, partition: usize, context: Arc, @@ -470,7 +470,7 @@ impl ExecutionPlan for TopKExec { } Ok(Box::pin(TopKReader { - input: self.input.execute(partition, context).await?, + input: self.input.execute(partition, context)?, k: self.k, done: false, state: BTreeMap::new(),