From 042a834251c4cfc6df4019bfaae2f108ea834842 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Feb 2023 12:51:00 +0100 Subject: [PATCH 1/3] fix: add yield point to `RepartitionExec` This prevents endless spinning and locked up tokio tasks if the inputs never yield `pending`. Fixes #5278. --- .../core/src/physical_plan/repartition/mod.rs | 4 + .../core/tests/repartition_exec_blocks.rs | 180 ++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 datafusion/core/tests/repartition_exec_blocks.rs diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 072bdfa3e6d04..8a7ce3ce2d658 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -532,6 +532,10 @@ impl RepartitionExec { } timer.done(); } + + // If the input stream is endless, we may spin forever and never yield back to tokio. Hence let us yield. + // See https://github.com/apache/arrow-datafusion/issues/5278. + tokio::task::yield_now().await; } Ok(()) diff --git a/datafusion/core/tests/repartition_exec_blocks.rs b/datafusion/core/tests/repartition_exec_blocks.rs new file mode 100644 index 0000000000000..72c05fa0b525b --- /dev/null +++ b/datafusion/core/tests/repartition_exec_blocks.rs @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::UInt32Array; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, +}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::from_slice::FromSlice; +use datafusion_common::{Result, Statistics}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use futures::{Stream, StreamExt}; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// A mock execution plan that simply returns the provided data source characteristic +#[derive(Debug, Clone)] +pub struct MyUnboundedExec { + batch_produce: Option, + schema: Arc, + /// Ref-counting helper to check if the plan and the produced stream are still in memory. + refs: Arc<()>, +} +impl MyUnboundedExec { + pub fn new(batch_produce: Option, schema: Schema) -> Self { + Self { + batch_produce, + schema: Arc::new(schema), + refs: Default::default(), + } + } +} +impl ExecutionPlan for MyUnboundedExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(UnboundedStream { + batch_produce: self.batch_produce, + count: 0, + schema: Arc::clone(&self.schema), + _refs: Arc::clone(&self.refs), + })) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "UnboundableExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[derive(Debug)] +pub struct UnboundedStream { + batch_produce: Option, + count: usize, + /// Schema mocked by this stream. + schema: SchemaRef, + + /// Ref-counting helper to check if the stream are still in memory. + _refs: Arc<()>, +} + +impl Stream for UnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(val) = self.batch_produce { + if val <= self.count { + println!("Stream Finished"); + return Poll::Ready(None); + } + } + let batch = RecordBatch::try_new( + self.schema.clone(), + vec![Arc::new(UInt32Array::from_slice([1]))], + )?; + self.count += 1; + Poll::Ready(Some(Ok(batch))) + } +} + +impl RecordBatchStream for UnboundedStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +/// See +#[tokio::test] +async fn unbounded_repartition_sa() -> Result<()> { + let config = SessionConfig::new(); + let ctx = SessionContext::with_config(config); + let task = ctx.task_ctx(); + let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]); + let input = Arc::new(MyUnboundedExec::new(None, schema.clone())); + let on: Vec> = vec![Arc::new(Column::new("a2", 0))]; + let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?); + let plan = Arc::new(CoalescePartitionsExec::new(plan.clone())); + let mut stream = plan.execute(0, task)?; + + // Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a + // CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not + // trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably). + let batch = stream + .next() + .await + .expect("not terminated") + .expect("no error in stream"); + assert_eq!(batch.schema().as_ref(), &schema); + Ok(()) +} From ac3bcefe69d71506c700da6736aa76725ff047ee Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Feb 2023 14:40:07 +0100 Subject: [PATCH 2/3] refactor: use a single `UnboundedExec` for testing --- .../src/physical_optimizer/pipeline_fixer.rs | 23 ++- datafusion/core/src/test/exec.rs | 70 --------- datafusion/core/src/test_util.rs | 132 +++++++++++++++- .../core/tests/repartition_exec_blocks.rs | 148 ++---------------- 4 files changed, 161 insertions(+), 212 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs index ab74ad37e03e8..1ca21bb88b7fb 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_fixer.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_fixer.rs @@ -195,9 +195,11 @@ mod hash_join_tests { use crate::physical_optimizer::join_selection::swap_join_type; use crate::physical_optimizer::test_utils::SourceType; use crate::physical_plan::expressions::Column; + use crate::physical_plan::joins::PartitionMode; use crate::physical_plan::projection::ProjectionExec; - use crate::{physical_plan::joins::PartitionMode, test::exec::UnboundedExec}; + use crate::test_util::UnboundedExec; use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; use std::sync::Arc; struct TestCase { @@ -529,17 +531,28 @@ mod hash_join_tests { } Ok(()) } + #[allow(clippy::vtable_address_comparisons)] async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { let left_unbounded = t.initial_sources_unbounded.0 == SourceType::Unbounded; let right_unbounded = t.initial_sources_unbounded.1 == SourceType::Unbounded; let left_exec = Arc::new(UnboundedExec::new( - left_unbounded, - Schema::new(vec![Field::new("a", DataType::Int32, false)]), + (!left_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Int32, + false, + )]))), + 2, )) as Arc; let right_exec = Arc::new(UnboundedExec::new( - right_unbounded, - Schema::new(vec![Field::new("b", DataType::Int32, false)]), + (!right_unbounded).then_some(1), + RecordBatch::new_empty(Arc::new(Schema::new(vec![Field::new( + "b", + DataType::Int32, + false, + )]))), + 2, )) as Arc; let join = HashJoinExec::try_new( diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index b191735469e1e..bce7d08a5c563 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -507,76 +507,6 @@ impl ExecutionPlan for StatisticsExec { } } -/// A mock execution plan that simply returns the provided data source characteristic -#[derive(Debug, Clone)] -pub struct UnboundedExec { - unbounded: bool, - schema: Arc, -} -impl UnboundedExec { - pub fn new(unbounded: bool, schema: Schema) -> Self { - Self { - unbounded, - schema: Arc::new(schema), - } - } -} -impl ExecutionPlan for UnboundedExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(2) - } - - fn unbounded_output(&self, _children: &[bool]) -> Result { - Ok(self.unbounded) - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("This plan only serves for testing statistics") - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!(f, "UnboundableExec: unbounded={}", self.unbounded,) - } - } - } - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} - /// Execution plan that emits streams that block forever. /// /// This is useful to test shutdown / cancelation behavior of certain execution plans. diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 669a1784561be..66059d7132117 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -19,17 +19,26 @@ use std::any::Any; use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{env, error::Error, path::PathBuf, sync::Arc}; use crate::datasource::datasource::TableProviderFactory; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; -use crate::execution::context::SessionState; +use crate::error::Result; +use crate::execution::context::{SessionState, TaskContext}; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; -use crate::physical_plan::ExecutionPlan; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + SendableRecordBatchStream, +}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; use async_trait::async_trait; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; +use datafusion_physical_expr::PhysicalSortExpr; +use futures::Stream; /// Compares formatted output of a record batch with an expected /// vector of strings, with the result of pretty formatting record @@ -332,6 +341,123 @@ impl TableProvider for TestTableProvider { } } +/// A mock execution plan that simply returns the provided data source characteristic +#[derive(Debug, Clone)] +pub struct UnboundedExec { + batch_produce: Option, + batch: RecordBatch, + partitions: usize, +} +impl UnboundedExec { + /// Create new exec that clones the given record batch to its output. + /// + /// Set `batch_produce` to `Some(n)` to emit exactly `n` batches per partition. + pub fn new( + batch_produce: Option, + batch: RecordBatch, + partitions: usize, + ) -> Self { + Self { + batch_produce, + batch, + partitions, + } + } +} +impl ExecutionPlan for UnboundedExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.batch.schema() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partitions) + } + + fn unbounded_output(&self, _children: &[bool]) -> Result { + Ok(self.batch_produce.is_none()) + } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(UnboundedStream { + batch_produce: self.batch_produce, + count: 0, + batch: self.batch.clone(), + })) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "UnboundableExec: unbounded={}", + self.batch_produce.is_none(), + ) + } + } + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[derive(Debug)] +struct UnboundedStream { + batch_produce: Option, + count: usize, + batch: RecordBatch, +} + +impl Stream for UnboundedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(val) = self.batch_produce { + if val <= self.count { + return Poll::Ready(None); + } + } + self.count += 1; + Poll::Ready(Some(Ok(self.batch.clone()))) + } +} + +impl RecordBatchStream for UnboundedStream { + fn schema(&self) -> SchemaRef { + self.batch.schema() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/tests/repartition_exec_blocks.rs b/datafusion/core/tests/repartition_exec_blocks.rs index 72c05fa0b525b..f093cb0073b3e 100644 --- a/datafusion/core/tests/repartition_exec_blocks.rs +++ b/datafusion/core/tests/repartition_exec_blocks.rs @@ -16,143 +16,19 @@ // under the License. use arrow::array::UInt32Array; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; -use datafusion::execution::context::TaskContext; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, -}; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion::test_util::UnboundedExec; use datafusion_common::from_slice::FromSlice; -use datafusion_common::{Result, Statistics}; +use datafusion_common::Result; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use futures::{Stream, StreamExt}; -use std::any::Any; -use std::pin::Pin; +use datafusion_physical_expr::PhysicalExpr; +use futures::StreamExt; use std::sync::Arc; -use std::task::{Context, Poll}; - -/// A mock execution plan that simply returns the provided data source characteristic -#[derive(Debug, Clone)] -pub struct MyUnboundedExec { - batch_produce: Option, - schema: Arc, - /// Ref-counting helper to check if the plan and the produced stream are still in memory. - refs: Arc<()>, -} -impl MyUnboundedExec { - pub fn new(batch_produce: Option, schema: Schema) -> Self { - Self { - batch_produce, - schema: Arc::new(schema), - refs: Default::default(), - } - } -} -impl ExecutionPlan for MyUnboundedExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - _: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - Ok(Box::pin(UnboundedStream { - batch_produce: self.batch_produce, - count: 0, - schema: Arc::clone(&self.schema), - _refs: Arc::clone(&self.refs), - })) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - write!( - f, - "UnboundableExec: unbounded={}", - self.batch_produce.is_none(), - ) - } - } - } - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} - -#[derive(Debug)] -pub struct UnboundedStream { - batch_produce: Option, - count: usize, - /// Schema mocked by this stream. - schema: SchemaRef, - - /// Ref-counting helper to check if the stream are still in memory. - _refs: Arc<()>, -} - -impl Stream for UnboundedStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - if let Some(val) = self.batch_produce { - if val <= self.count { - println!("Stream Finished"); - return Poll::Ready(None); - } - } - let batch = RecordBatch::try_new( - self.schema.clone(), - vec![Arc::new(UInt32Array::from_slice([1]))], - )?; - self.count += 1; - Poll::Ready(Some(Ok(batch))) - } -} - -impl RecordBatchStream for UnboundedStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} /// See #[tokio::test] @@ -160,8 +36,12 @@ async fn unbounded_repartition_sa() -> Result<()> { let config = SessionConfig::new(); let ctx = SessionContext::with_config(config); let task = ctx.task_ctx(); - let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]); - let input = Arc::new(MyUnboundedExec::new(None, schema.clone())); + let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from_slice([1]))], + )?; + let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1)); let on: Vec> = vec![Arc::new(Column::new("a2", 0))]; let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?); let plan = Arc::new(CoalescePartitionsExec::new(plan.clone())); @@ -170,11 +50,11 @@ async fn unbounded_repartition_sa() -> Result<()> { // Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a // CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not // trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably). - let batch = stream + let batch_actual = stream .next() .await .expect("not terminated") .expect("no error in stream"); - assert_eq!(batch.schema().as_ref(), &schema); + assert_eq!(batch_actual, batch); Ok(()) } From ed17f26499d459fd232a3918de55a95d63770279 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 17 Feb 2023 11:05:44 +0100 Subject: [PATCH 3/3] refactor: rename test --- .../core/tests/{repartition_exec_blocks.rs => repartition.rs} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename datafusion/core/tests/{repartition_exec_blocks.rs => repartition.rs} (98%) diff --git a/datafusion/core/tests/repartition_exec_blocks.rs b/datafusion/core/tests/repartition.rs similarity index 98% rename from datafusion/core/tests/repartition_exec_blocks.rs rename to datafusion/core/tests/repartition.rs index f093cb0073b3e..4fd3a91255f78 100644 --- a/datafusion/core/tests/repartition_exec_blocks.rs +++ b/datafusion/core/tests/repartition.rs @@ -32,7 +32,7 @@ use std::sync::Arc; /// See #[tokio::test] -async fn unbounded_repartition_sa() -> Result<()> { +async fn unbounded_repartition() -> Result<()> { let config = SessionConfig::new(); let ctx = SessionContext::with_config(config); let task = ctx.task_ctx();