diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index caf9b5ebcbdcc..68f8882102f5a 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -20,6 +20,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; +use crate::execution::memory_pool::MemoryReservation; use crate::physical_plan::metrics::MemTrackingMetrics; use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics}; use arrow::datatypes::{Schema, SchemaRef}; @@ -28,6 +29,7 @@ use arrow::record_batch::RecordBatch; use datafusion_physical_expr::PhysicalSortExpr; use futures::{Future, Stream, StreamExt, TryStreamExt}; use log::debug; +use parking_lot::Mutex; use pin_project_lite::pin_project; use std::fs; use std::fs::{metadata, File}; @@ -37,6 +39,8 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc; use tokio::task::JoinHandle; +pub(crate) type SharedMemoryReservation = Arc>; + /// Stream of record batches pub struct SizedRecordBatchStream { schema: SchemaRef, diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 522cebe0cf0b0..0523a2e35d259 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -26,6 +26,9 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; +use crate::execution::memory_pool::MemoryConsumer; +use crate::physical_plan::common::SharedMemoryReservation; +use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, @@ -35,12 +38,11 @@ use crate::physical_plan::{ use crate::{error::Result, scalar::ScalarValue}; use async_trait::async_trait; use datafusion_common::DataFusionError; -use log::debug; -use std::time::Instant; +use parking_lot::Mutex; use super::utils::{ - adjust_right_output_partitioning, cross_join_equivalence_properties, OnceAsync, - OnceFut, + adjust_right_output_partitioning, cross_join_equivalence_properties, + BuildProbeJoinMetrics, OnceAsync, OnceFut, }; /// Data of the left side @@ -58,6 +60,8 @@ pub struct CrossJoinExec { schema: SchemaRef, /// Build-side data left_fut: OnceAsync, + /// Execution plan metrics + metrics: ExecutionPlanMetricsSet, } impl CrossJoinExec { @@ -79,6 +83,7 @@ impl CrossJoinExec { right, schema, left_fut: Default::default(), + metrics: ExecutionPlanMetricsSet::default(), } } @@ -97,9 +102,9 @@ impl CrossJoinExec { async fn load_left_input( left: Arc, context: Arc, + metrics: BuildProbeJoinMetrics, + reservation: SharedMemoryReservation, ) -> Result { - let start = Instant::now(); - // merge all left parts into a single stream let merge = { if left.output_partitioning().partition_count() != 1 { @@ -111,22 +116,28 @@ async fn load_left_input( let stream = merge.execute(0, context)?; // Load all batches and count the rows - let (batches, num_rows) = stream - .try_fold((Vec::new(), 0usize), |mut acc, batch| async { - acc.1 += batch.num_rows(); - acc.0.push(batch); - Ok(acc) - }) + let (batches, num_rows, _, _) = stream + .try_fold( + (Vec::new(), 0usize, metrics, reservation), + |mut acc, batch| async { + let batch_size = batch.get_array_memory_size(); + // Reserve memory for incoming batch + acc.3.lock().try_grow(batch_size)?; + // Update metrics + acc.2.build_mem_used.add(batch_size); + acc.2.build_input_batches.add(1); + acc.2.build_input_rows.add(batch.num_rows()); + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }, + ) .await?; let merged_batch = concat_batches(&left.schema(), &batches, num_rows)?; - debug!( - "Built build-side of cross join containing {} rows in {} ms", - num_rows, - start.elapsed().as_millis() - ); - Ok(merged_batch) } @@ -143,6 +154,10 @@ impl ExecutionPlan for CrossJoinExec { vec![self.left.clone(), self.right.clone()] } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but it its input(s) are /// infinite, returns an error to indicate this. @@ -205,9 +220,20 @@ impl ExecutionPlan for CrossJoinExec { ) -> Result { let stream = self.right.execute(partition, context.clone())?; - let left_fut = self - .left_fut - .once(|| load_left_input(self.left.clone(), context)); + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); + let reservation = Arc::new(Mutex::new( + MemoryConsumer::new(format!("CrossJoinStream[{partition}]")) + .register(context.memory_pool()), + )); + + let left_fut = self.left_fut.once(|| { + load_left_input( + self.left.clone(), + context, + join_metrics.clone(), + reservation.clone(), + ) + }); Ok(Box::pin(CrossJoinStream { schema: self.schema.clone(), @@ -215,11 +241,8 @@ impl ExecutionPlan for CrossJoinExec { right: stream, right_batch: Arc::new(parking_lot::Mutex::new(None)), left_index: 0, - num_input_batches: 0, - num_input_rows: 0, - num_output_batches: 0, - num_output_rows: 0, - join_time: 0, + join_metrics, + reservation, })) } @@ -321,16 +344,10 @@ struct CrossJoinStream { left_index: usize, /// Current batch being processed from the right side right_batch: Arc>>, - /// number of input batches - num_input_batches: usize, - /// number of input rows - num_input_rows: usize, - /// number of batches produced - num_output_batches: usize, - /// number of rows produced - num_output_rows: usize, - /// total time for joining probe-side batches to the build-side batches - join_time: usize, + /// join execution metrics + join_metrics: BuildProbeJoinMetrics, + /// memory reservation + reservation: SharedMemoryReservation, } impl RecordBatchStream for CrossJoinStream { @@ -385,28 +402,30 @@ impl CrossJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { + let build_timer = self.join_metrics.build_time.timer(); let left_data = match ready!(self.left_fut.get(cx)) { Ok(left_data) => left_data, Err(e) => return Poll::Ready(Some(Err(e))), }; + build_timer.done(); if left_data.num_rows() == 0 { return Poll::Ready(None); } if self.left_index > 0 && self.left_index < left_data.num_rows() { - let start = Instant::now(); + let join_timer = self.join_metrics.join_time.timer(); let right_batch = { let right_batch = self.right_batch.lock(); right_batch.clone().unwrap() }; let result = build_batch(self.left_index, &right_batch, left_data, &self.schema); - self.num_input_rows += right_batch.num_rows(); + self.join_metrics.input_rows.add(right_batch.num_rows()); if let Ok(ref batch) = result { - self.join_time += start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + join_timer.done(); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); } self.left_index += 1; return Poll::Ready(Some(result)); @@ -416,15 +435,15 @@ impl CrossJoinStream { .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { Some(Ok(batch)) => { - let start = Instant::now(); + let join_timer = self.join_metrics.join_time.timer(); let result = build_batch(self.left_index, &batch, left_data, &self.schema); - self.num_input_batches += 1; - self.num_input_rows += batch.num_rows(); + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); if let Ok(ref batch) = result { - self.join_time += start.elapsed().as_millis() as usize; - self.num_output_batches += 1; - self.num_output_rows += batch.num_rows(); + join_timer.done(); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); } self.left_index = 1; @@ -434,15 +453,7 @@ impl CrossJoinStream { Some(result) } other => { - debug!( - "Processed {} probe-side input batches containing {} rows and \ - produced {} output batches containing {} rows in {} ms", - self.num_input_batches, - self.num_input_rows, - self.num_output_batches, - self.num_output_rows, - self.join_time - ); + self.reservation.lock().free(); other } }) @@ -452,6 +463,26 @@ impl CrossJoinStream { #[cfg(test)] mod tests { use super::*; + use crate::assert_batches_sorted_eq; + use crate::common::assert_contains; + use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use crate::physical_plan::common; + use crate::prelude::{SessionConfig, SessionContext}; + use crate::test::{build_table_scan_i32, columns}; + + async fn join_collect( + left: Arc, + right: Arc, + context: Arc, + ) -> Result<(Vec, Vec)> { + let join = CrossJoinExec::new(left, right); + let columns_header = columns(&join.schema()); + + let stream = join.execute(0, context)?; + let batches = common::collect(stream).await?; + + Ok((columns_header, batches)) + } #[tokio::test] async fn test_stats_cartesian_product() { @@ -589,4 +620,70 @@ mod tests { assert_eq!(result, expected); } + + #[tokio::test] + async fn test_join() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + + let left = build_table_scan_i32( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 6]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table_scan_i32( + ("a2", &vec![10, 11]), + ("b2", &vec![12, 13]), + ("c2", &vec![14, 15]), + ); + + let (columns, batches) = join_collect(left, right, task_ctx).await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + let expected = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 4 | 7 | 10 | 12 | 14 |", + "| 1 | 4 | 7 | 11 | 13 | 15 |", + "| 2 | 5 | 8 | 10 | 12 | 14 |", + "| 2 | 5 | 8 | 11 | 13 | 15 |", + "| 3 | 6 | 9 | 10 | 12 | 14 |", + "| 3 | 6 | 9 | 11 | 13 | 15 |", + "+----+----+----+----+----+----+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn test_overallocation() -> Result<()> { + let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); + let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_ctx = + SessionContext::with_config_rt(SessionConfig::default(), runtime); + let task_ctx = session_ctx.task_ctx(); + + let left = build_table_scan_i32( + ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ); + let right = build_table_scan_i32( + ("a2", &vec![10, 11]), + ("b2", &vec![12, 13]), + ("c2", &vec![14, 15]), + ); + + let err = join_collect(left, right, task_ctx).await.unwrap_err(); + + assert_contains!( + err.to_string(), + "External error: Resources exhausted: Failed to allocate additional" + ); + + Ok(()) + } } diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index b01483f560bd6..2f02673159f12 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result, SharedResult}; use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; +use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; use crate::physical_plan::SchemaRef; use arrow::array::{ new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array, @@ -968,6 +969,68 @@ pub(crate) fn get_semi_u64_indices( .collect::() } +/// Metrics for build & probe joins +#[derive(Clone, Debug)] +pub(crate) struct BuildProbeJoinMetrics { + /// Total time for collecting build-side of join + pub(crate) build_time: metrics::Time, + /// Number of batches consumed by build-side + pub(crate) build_input_batches: metrics::Count, + /// Number of rows consumed by build-side + pub(crate) build_input_rows: metrics::Count, + /// Memory used by build-side in bytes + pub(crate) build_mem_used: metrics::Gauge, + /// Total time for joining probe-side batches to the build-side batches + pub(crate) join_time: metrics::Time, + /// Number of batches consumed by probe-side of this operator + pub(crate) input_batches: metrics::Count, + /// Number of rows consumed by probe-side this operator + pub(crate) input_rows: metrics::Count, + /// Number of batches produced by this operator + pub(crate) output_batches: metrics::Count, + /// Number of rows produced by this operator + pub(crate) output_rows: metrics::Count, +} + +impl BuildProbeJoinMetrics { + pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { + let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition); + + let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition); + + let build_input_batches = + MetricBuilder::new(metrics).counter("build_input_batches", partition); + + let build_input_rows = + MetricBuilder::new(metrics).counter("build_input_rows", partition); + + let build_mem_used = + MetricBuilder::new(metrics).gauge("build_mem_used", partition); + + let input_batches = + MetricBuilder::new(metrics).counter("input_batches", partition); + + let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); + + let output_batches = + MetricBuilder::new(metrics).counter("output_batches", partition); + + let output_rows = MetricBuilder::new(metrics).output_rows(partition); + + Self { + build_time, + build_input_batches, + build_input_rows, + build_mem_used, + join_time, + input_batches, + input_rows, + output_batches, + output_rows, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 8a7ce3ce2d658..7f13418d26f70 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -24,7 +24,7 @@ use std::task::{Context, Poll}; use std::{any::Any, vec}; use crate::error::{DataFusionError, Result}; -use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use crate::execution::memory_pool::MemoryConsumer; use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::repartition::distributor_channels::channels; use crate::physical_plan::{ @@ -37,7 +37,7 @@ use log::debug; use self::distributor_channels::{DistributionReceiver, DistributionSender}; -use super::common::{AbortOnDropMany, AbortOnDropSingle}; +use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; use super::expressions::PhysicalSortExpr; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream}; @@ -53,7 +53,6 @@ use tokio::task::JoinHandle; mod distributor_channels; type MaybeBatch = Option>; -type SharedMemoryReservation = Arc>; /// Inner state of [`RepartitionExec`]. #[derive(Debug)] diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 9dccde8a73457..f7113123cf980 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -26,6 +26,8 @@ use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_expr::LogicalPlan; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; +use crate::physical_plan::memory::MemoryExec; +use crate::physical_plan::ExecutionPlan; use crate::test::object_store::local_unpartitioned_file; use crate::test_util::{aggr_test_schema, arrow_test_data}; use array::ArrayRef; @@ -207,7 +209,7 @@ pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) { assert_eq!(actual, expected); } -/// returns a table with 3 columns of i32 in memory +/// returns record batch with 3 columns of i32 in memory pub fn build_table_i32( a: (&str, &Vec), b: (&str, &Vec), @@ -230,6 +232,17 @@ pub fn build_table_i32( .unwrap() } +/// returns memory table scan wrapped around record batch with 3 columns of i32 +pub fn build_table_scan_i32( + a: (&str, &Vec), + b: (&str, &Vec), + c: (&str, &Vec), +) -> Arc { + let batch = build_table_i32(a, b, c); + let schema = batch.schema(); + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap()) +} + /// Returns the column names on the schema pub fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index 170f559038df1..75e26485d2515 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -74,6 +74,16 @@ async fn group_by_hash() { .await } +#[tokio::test] +async fn cross_join() { + run_limit_test( + "select t1.* from t t1 CROSS JOIN t t2", + "Resources exhausted: Failed to allocate additional", + 1_000, + ) + .await +} + /// 50 byte memory limit const MEMORY_FRACTION: f64 = 0.95;