diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index dd194263d6bc7..79535497d4ed3 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -21,7 +21,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::physical_plan::metrics::MemTrackingMetrics; -use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; +use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics}; use arrow::compute::concat; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; @@ -29,6 +29,7 @@ use arrow::error::Result as ArrowResult; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; use futures::{Future, Stream, StreamExt, TryStreamExt}; +use log::debug; use pin_project_lite::pin_project; use std::fs; use std::fs::{metadata, File}; @@ -185,6 +186,10 @@ pub(crate) fn spawn_execution( // there is no place to send the error. let arrow_error = ArrowError::ExternalError(Box::new(e)); output.send(Err(arrow_error)).await.ok(); + debug!( + "Stopping execution: error executing input: {}", + displayable(input.as_ref()).one_line() + ); return; } Ok(stream) => stream, @@ -193,7 +198,11 @@ pub(crate) fn spawn_execution( while let Some(item) = stream.next().await { // If send fails, plan being torn down, // there is no place to send the error. - if let Err(_) = output.send(item).await { + if output.send(item).await.is_err() { + debug!( + "Stopping execution: output is gone, plan cancelling: {}", + displayable(input.as_ref()).one_line() + ); return; } }