diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index ab74ad37e03e8..1ca21bb88b7fb 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -195,9 +195,11 @@ mod hash_join_tests { use crate::physical_optimizer::join_selection::swap_join_type; use crate::physical_optimizer::test_utils::SourceType; use crate::physical_plan::expressions::Column; + use crate::physical_plan::joins::PartitionMode; use crate::physical_plan::projection::ProjectionExec; - use crate::{physical_plan::joins::PartitionMode, test::exec::UnboundedExec}; + use crate::test_util::UnboundedExec; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; use std::sync::Arc; struct TestCase { @@ -529,17 +531,28 @@ mod hash_join_tests { } Ok(()) } + #[allow(clippy::vtable_address_comparisons)] async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded; let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded; let left_exec = Arc::new(UnboundedExec::new( - left_unbounded, - Schema::new(vec![Field::new("a", DataType::Int32, false)]), + (!left_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Int32, + false, + )]))), + 2, )) as Arc; let right_exec = Arc::new(UnboundedExec::new( - right_unbounded, - Schema::new(vec![Field::new("b", DataType::Int32, false)]), + (!right_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "b", + DataType::Int32, + false, + )]))), + 2, )) as Arc; let join = HashJoinExec::try_new( diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 072bdfa3e6d04..8a7ce3ce2d658 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -532,6 +532,10 @@ impl RepartitionExec { } timer.done(); } + + // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield. + // See https://github.com/apache/arrow-datafusion/issues/5278. + tokio::task::yield_now().await; } Ok(()) diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index b191735469e1e..bce7d08a5c563 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -507,76 +507,6 @@ impl ExecutionPlan for StatisticsExec { } } -/// A mock execution plan that simply returns the provided data source characteristic -#[derive(Debug, Clone)] -pub struct UnboundedExec { - unbounded: bool, - schema: Arc, -} -impl UnboundedExec { - pub fn new(unbounded: bool, schema: Schema) -> Self { - Self { - unbounded, - schema: Arc::new(schema), - } - } -} -impl ExecutionPlan for UnboundedExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(2) - } - - fn unbounded_output(&self, _children: &[bool]) -> Result { - Ok(self.unbounded) - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("This plan only serves for testing statistics") - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "UnboundableExec: unbounded={}", self.unbounded,) - } - } - } - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} - /// Execution plan that emits streams that block forever. /// /// This is useful to test shutdown / cancelation behavior of certain execution plans. diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 669a1784561be..66059d7132117 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -19,17 +19,26 @@ use std::any::Any; use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{env, error::Error, path::PathBuf, sync::Arc}; use crate::datasource::datasource::TableProviderFactory; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; -use crate::execution::context::SessionState; +use crate::error::Result; +use crate::execution::context::{SessionState, TaskContext}; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; -use crate::physical_plan::ExecutionPlan; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, +}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; +use futures::Stream; /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record @@ -332,6 +341,123 @@ impl TableProvider for TestTableProvider { } } +/// A mock execution plan that simply returns the provided data source characteristic +#[derive(Debug, Clone)] +pub struct UnboundedExec { + batch_produce: Option, + batch: RecordBatch, + partitions: usize, +} +impl UnboundedExec { + /// Create new exec that clones the given record batch to its output. + /// + /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. + pub fn new( + batch_produce: Option, + batch: RecordBatch, + partitions: usize, + ) -> Self { + Self { + batch_produce, + batch, + partitions, + } + } +} +impl ExecutionPlan for UnboundedExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.batch.schema() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partitions) + } + + fn unbounded_output(&self, _children: &[bool]) -> Result { + Ok(self.batch_produce.is_none()) + } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(UnboundedStream { + batch_produce: self.batch_produce, + count: 0, + batch: self.batch.clone(), + })) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "UnboundableExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[derive(Debug)] +struct UnboundedStream { + batch_produce: Option, + count: usize, + batch: RecordBatch, +} + +impl Stream for UnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(val) = self.batch_produce { + if val <= self.count { + return Poll::Ready(None); + } + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } +} + +impl RecordBatchStream for UnboundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/tests/repartition.rs b/datafusion/core/tests/repartition.rs new file mode 100644 index 0000000000000..4fd3a91255f78 --- /dev/null +++ b/datafusion/core/tests/repartition.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::UInt32Array; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::test_util::UnboundedExec; +use datafusion_common::from_slice::FromSlice; +use datafusion_common::Result; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::PhysicalExpr; +use futures::StreamExt; +use std::sync::Arc; + +/// See +#[tokio::test] +async fn unbounded_repartition() -> Result<()> { + let config = SessionConfig::new(); + let ctx = SessionContext::with_config(config); + let task = ctx.task_ctx(); + let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from_slice([1]))], + )?; + let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1)); + let on: Vec> = vec![Arc::new(Column::new("a2", 0))]; + let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?); + let plan = Arc::new(CoalescePartitionsExec::new(plan.clone())); + let mut stream = plan.execute(0, task)?; + + // Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a + // CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not + // trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably). + let batch_actual = stream + .next() + .await + .expect("not terminated") + .expect("no error in stream"); + assert_eq!(batch_actual, batch); + Ok(()) +}