diff --git a/Cargo.lock b/Cargo.lock index 933a0ee44a76d..6bb74db810353 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1999,6 +1999,7 @@ dependencies = [ "clap 4.5.47", "ctor", "datafusion", + "datafusion-execution", "dirs", "env_logger", "futures", @@ -2212,6 +2213,7 @@ dependencies = [ "bytes", "dashmap", "datafusion", + "datafusion-common", "datafusion-ffi", "datafusion-physical-expr-adapter", "datafusion-proto", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 394adbb6105f3..7c0c3342c2ba2 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -53,6 +53,7 @@ datafusion = { workspace = true, features = [ "unicode_expressions", "compression", ] } +datafusion-execution = { workspace = true } dirs = "6.0.0" env_logger = { workspace = true } futures = { workspace = true } diff --git a/datafusion-cli/src/cli_context.rs b/datafusion-cli/src/cli_context.rs index 516929ebacf19..88a36bd2c0fe4 100644 --- a/datafusion-cli/src/cli_context.rs +++ b/datafusion-cli/src/cli_context.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{context::SessionState, TaskContext}, + execution::{context::SessionState, memory_pool::TrackedPool, TaskContext}, logical_expr::LogicalPlan, prelude::SessionContext, }; @@ -28,6 +28,30 @@ use object_store::ObjectStore; use crate::object_storage::{AwsOptions, GcpOptions}; +/// Registers table option extensions based on the provided URL scheme. +/// +/// Supported schemes are: +/// * `s3`, `oss`, `cos` - registers [`AwsOptions`] +/// * `gs`, `gcs` - registers [`GcpOptions`] +/// +/// Any other scheme is ignored. +pub fn register_table_options_from_scheme(ctx: &SessionContext, scheme: &str) { + match scheme { + // For Amazon S3 or Alibaba Cloud OSS + "s3" | "oss" | "cos" => { + // Register AWS specific table options in the session context: + ctx.register_table_options_extension(AwsOptions::default()) + } + // For Google Cloud Storage + "gs" | "gcs" => { + // Register GCP specific table options in the session context: + ctx.register_table_options_extension(GcpOptions::default()) + } + // For unsupported schemes, do nothing: + _ => {} + } +} + #[async_trait::async_trait] /// The CLI session context trait provides a way to have a session context that can be used with datafusion's CLI code. pub trait CliSessionContext { @@ -52,6 +76,19 @@ pub trait CliSessionContext { &self, plan: LogicalPlan, ) -> Result; + + /// Return true if memory profiling is enabled. + fn memory_profiling(&self) -> bool { + false + } + + /// Enable or disable memory profiling. + fn set_memory_profiling(&self, _enable: bool) {} + + /// Return the tracked memory pool used for profiling, if any. + fn tracked_memory_pool(&self) -> Option> { + None + } } #[async_trait::async_trait] @@ -73,26 +110,82 @@ impl CliSessionContext for SessionContext { } fn register_table_options_extension_from_scheme(&self, scheme: &str) { - match scheme { - // For Amazon S3 or Alibaba Cloud OSS - "s3" | "oss" | "cos" => { - // Register AWS specific table options in the session context: - self.register_table_options_extension(AwsOptions::default()) - } - // For Google Cloud Storage - "gs" | "gcs" => { - // Register GCP specific table options in the session context: - self.register_table_options_extension(GcpOptions::default()) - } - // For unsupported schemes, do nothing: - _ => {} + register_table_options_from_scheme(self, scheme); + } + + async fn execute_logical_plan( + &self, + plan: LogicalPlan, + ) -> Result { + SessionContext::execute_logical_plan(self, plan).await + } +} + +/// Session context used by the CLI with memory profiling support. +pub struct ReplSessionContext { + ctx: SessionContext, + tracked_memory_pool: Option>, +} + +impl ReplSessionContext { + pub fn new( + ctx: SessionContext, + tracked_memory_pool: Option>, + ) -> Self { + Self { + ctx, + tracked_memory_pool, } } +} + +#[async_trait::async_trait] +impl CliSessionContext for ReplSessionContext { + fn task_ctx(&self) -> Arc { + self.ctx.task_ctx() + } + + fn session_state(&self) -> SessionState { + self.ctx.state() + } + + fn register_object_store( + &self, + url: &url::Url, + object_store: Arc, + ) -> Option> { + self.ctx.register_object_store(url, object_store) + } + + fn register_table_options_extension_from_scheme(&self, scheme: &str) { + register_table_options_from_scheme(&self.ctx, scheme); + } async fn execute_logical_plan( &self, plan: LogicalPlan, ) -> Result { - self.execute_logical_plan(plan).await + self.ctx.execute_logical_plan(plan).await + } + + fn memory_profiling(&self) -> bool { + self.tracked_memory_pool + .as_ref() + .map(|pool| pool.tracking_enabled()) + .unwrap_or(false) + } + + fn set_memory_profiling(&self, enable: bool) { + if let Some(pool) = &self.tracked_memory_pool { + if enable { + pool.enable_tracking(); + } else { + pool.disable_tracking(); + } + } + } + + fn tracked_memory_pool(&self) -> Option> { + self.tracked_memory_pool.clone() } } diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 77bc8d3d20003..0f93079686dad 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -46,6 +46,7 @@ pub enum Command { SearchFunctions(String), QuietMode(Option), OutputFormat(Option), + MemoryProfiling, } pub enum OutputFormat { @@ -110,6 +111,15 @@ impl Command { } Ok(()) } + Self::MemoryProfiling => { + let enable = !ctx.memory_profiling(); + ctx.set_memory_profiling(enable); + println!( + "Memory profiling {}", + if enable { "enabled" } else { "disabled" } + ); + Ok(()) + } Self::Quit => exec_err!("Unexpected quit, this should be handled outside"), Self::ListFunctions => display_all_functions(), Self::SearchFunctions(function) => { @@ -142,11 +152,15 @@ impl Command { Self::OutputFormat(_) => { ("\\pset [NAME [VALUE]]", "set table output option\n(format)") } + Self::MemoryProfiling => ( + "\\memory_profiling", + "toggle memory profiling (requires --top-memory-consumers N at startup for metrics)", + ), } } } -const ALL_COMMANDS: [Command; 9] = [ +const ALL_COMMANDS: [Command; 10] = [ Command::ListTables, Command::DescribeTableStmt(String::new()), Command::Quit, @@ -156,6 +170,7 @@ const ALL_COMMANDS: [Command; 9] = [ Command::SearchFunctions(String::new()), Command::QuietMode(None), Command::OutputFormat(None), + Command::MemoryProfiling, ]; fn all_commands_info() -> RecordBatch { @@ -206,6 +221,8 @@ impl FromStr for Command { Self::OutputFormat(Some(subcommand.to_string())) } ("pset", None) => Self::OutputFormat(None), + ("memory_profiling", None) => Self::MemoryProfiling, + ("memory_profiling", Some(_)) => return Err(()), _ => return Err(()), }) } diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index eb7174dbbd6f2..2ce3f26f61324 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -19,6 +19,7 @@ use crate::cli_context::CliSessionContext; use crate::helper::split_from_semicolon; +use crate::memory_metrics::format_metrics; use crate::print_format::PrintFormat; use crate::{ command::{Command, OutputFormat}, @@ -313,6 +314,12 @@ impl StatementExecutor { )?; reservation.free(); } + if ctx.memory_profiling() { + if let Some(pool) = ctx.tracked_memory_pool() { + let metrics = pool.consumer_metrics(); + println!("{}", format_metrics(&metrics)); + } + } Ok(()) } diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index 34fba6f79304b..0819fa1b6f14f 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -30,6 +30,7 @@ pub mod exec; pub mod functions; pub mod helper; pub mod highlighter; +pub mod memory_metrics; pub mod object_storage; pub mod pool_type; pub mod print_format; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index a28e97a9f88ec..5db070903c36c 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -25,7 +25,8 @@ use std::sync::{Arc, LazyLock}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{ - FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, + FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, TrackedPool, + UnboundedMemoryPool, }; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::SessionContext; @@ -39,6 +40,8 @@ use datafusion_cli::{ DATAFUSION_CLI_VERSION, }; +use datafusion_cli::cli_context::ReplSessionContext; + use clap::Parser; use datafusion::common::config_err; use datafusion::config::ConfigOptions; @@ -123,7 +126,7 @@ struct Args { #[clap( long, help = "The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0", - default_value = "3" + default_value = "5" )] top_memory_consumers: usize, @@ -174,28 +177,30 @@ async fn main_inner() -> Result<()> { let session_config = get_session_config(&args)?; let mut rt_builder = RuntimeEnvBuilder::new(); - // set memory pool size - if let Some(memory_limit) = args.memory_limit { - // set memory pool type - let pool: Arc = match args.mem_pool_type { - PoolType::Fair if args.top_memory_consumers == 0 => { - Arc::new(FairSpillPool::new(memory_limit)) - } - PoolType::Fair => Arc::new(TrackConsumersPool::new( - FairSpillPool::new(memory_limit), - NonZeroUsize::new(args.top_memory_consumers).unwrap(), - )), - PoolType::Greedy if args.top_memory_consumers == 0 => { - Arc::new(GreedyMemoryPool::new(memory_limit)) + + // set memory pool type + let base_memory_pool: Arc = + if let Some(memory_limit) = args.memory_limit { + match args.mem_pool_type { + PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)), + PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)), } - PoolType::Greedy => Arc::new(TrackConsumersPool::new( - GreedyMemoryPool::new(memory_limit), - NonZeroUsize::new(args.top_memory_consumers).unwrap(), - )), + } else { + Arc::new(UnboundedMemoryPool::default()) }; - rt_builder = rt_builder.with_memory_pool(pool) - } + let tracked_pool: Option> = if args.top_memory_consumers > 0 { + let tracked = Arc::new(TrackConsumersPool::new( + base_memory_pool.clone(), + NonZeroUsize::new(args.top_memory_consumers).unwrap(), + )); + tracked.disable_tracking(); + rt_builder = rt_builder.with_memory_pool(tracked.clone()); + Some(tracked as Arc) + } else { + rt_builder = rt_builder.with_memory_pool(base_memory_pool); + None + }; // set disk limit if let Some(disk_limit) = args.disk_limit { @@ -208,24 +213,25 @@ async fn main_inner() -> Result<()> { let runtime_env = rt_builder.build_arc()?; // enable dynamic file query - let ctx = SessionContext::new_with_config_rt(session_config, runtime_env) + let session_ctx = SessionContext::new_with_config_rt(session_config, runtime_env) .enable_url_table(); - ctx.refresh_catalogs().await?; + session_ctx.refresh_catalogs().await?; // install dynamic catalog provider that can register required object stores - ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( - ctx.state().catalog_list().clone(), - ctx.state_weak_ref(), + session_ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new( + session_ctx.state().catalog_list().clone(), + session_ctx.state_weak_ref(), ))); // register `parquet_metadata` table function to get metadata from parquet files - ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {})); - + session_ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {})); // register `metadata_cache` table function to get the contents of the file metadata cache - ctx.register_udtf( + session_ctx.register_udtf( "metadata_cache", Arc::new(MetadataCacheFunc::new( - ctx.task_ctx().runtime_env().cache_manager.clone(), + session_ctx.task_ctx().runtime_env().cache_manager.clone(), )), ); + // wrap the SessionContext in a REPL context (adds profiling, top consumers, etc.) + let ctx = ReplSessionContext::new(session_ctx, tracked_pool); let mut print_options = PrintOptions { format: args.format, diff --git a/datafusion-cli/src/memory_metrics.rs b/datafusion-cli/src/memory_metrics.rs new file mode 100644 index 0000000000000..f08c303dfcedd --- /dev/null +++ b/datafusion-cli/src/memory_metrics.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for formatting memory usage metrics. + +use datafusion::execution::memory_pool::{ + human_readable_size, operator_category, ConsumerMemoryMetrics, +}; +use std::{collections::BTreeMap, fmt::Write}; + +/// Format summary of memory usage metrics. +/// +/// Returns a string with peak usage, cumulative allocations, and totals per +/// operator category. The caller is responsible for printing the returned +/// string if desired. +pub fn format_metrics(metrics: &[ConsumerMemoryMetrics]) -> String { + if metrics.is_empty() { + return "no memory metrics recorded".to_string(); + } + + let peak = metrics.iter().map(|m| m.peak).max().unwrap_or(0); + let cumulative: usize = metrics.iter().map(|m| m.cumulative).sum(); + + let mut s = String::new(); + let _ = writeln!(s, "Peak memory usage: {}", human_readable_size(peak)); + let _ = writeln!( + s, + "Cumulative allocations: {}", + human_readable_size(cumulative) + ); + + let mut by_op: BTreeMap<&str, usize> = BTreeMap::new(); + for m in metrics { + let category = operator_category(&m.name); + *by_op.entry(category).or_default() += m.cumulative; + } + + let _ = writeln!(s, "Memory usage by operator:"); + for (op, bytes) in by_op { + let _ = writeln!(s, "{op}: {}", human_readable_size(bytes)); + } + s +} diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 125771acb30ea..dbe14505b0278 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -37,10 +37,30 @@ fn make_settings() -> Settings { settings.set_prepend_module_to_snapshot(false); settings.add_filter(r"Elapsed .* seconds\.", "[ELAPSED]"); settings.add_filter(r"DataFusion CLI v.*", "[CLI_VERSION]"); - settings.add_filter(r"(?s)backtrace:.*?\n\n\n", ""); + settings.add_filter(r"(?s)backtrace:.*", ""); settings } +// Common insta filters for tests that include memory profiling output. +fn add_memory_filters(settings: &mut Settings) { + // Loosen memory profiling output: replace dynamic byte counts and categories with placeholders + // Match values like 'Peak memory usage: 10.0 MB' or 'Peak memory usage: 1024 B' + settings.add_filter(r"Peak memory usage: .*?B", "Peak memory usage: XB"); + settings.add_filter( + r"Cumulative allocations: .*?B", + "Cumulative allocations: XB", + ); + settings.add_filter(r"Other: .*?B", "Other: XB"); + settings.add_filter(r"Sorting: .*?B", "Sorting: XB"); + + settings.add_filter(r"Memory usage by operator:", "Memory usage by operator:"); + + // Fallback: allow uncaptured lines to appear as-is by capturing the whole line + // and replacing with the captured group. This ensures we don't accidentally + // hide content that other filters should not change. + settings.add_filter(r"^(.*)$", "$1"); +} + async fn setup_minio_container() -> ContainerAsync { const MINIO_ROOT_USER: &str = "TEST-DataFusionLogin"; const MINIO_ROOT_PASSWORD: &str = "TEST-DataFusionPassword"; @@ -218,9 +238,11 @@ fn test_cli_top_memory_consumers<'a>( settings.set_snapshot_suffix(snapshot_name); + // Match consumer lines like `ExternalSorterMerge[0]#2(can spill: false) consumed 10.0 MB` + // or with an optional peak part: `... consumed 10.0 MB, peak 12.0 MB`. settings.add_filter( - r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B, peak .*?B", - "Consumer(can spill: bool) consumed XB, peak XB", + r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B(?:, peak .*?B)?", + "Consumer(can spill: bool) consumed XB", ); settings.add_filter( r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", @@ -230,15 +252,57 @@ fn test_cli_top_memory_consumers<'a>( r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool", "Resources exhausted: Failed to allocate", ); + add_memory_filters(&mut settings); let _bound = settings.bind_to_scope(); let mut cmd = cli(); - let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;"; - cmd.args(["--memory-limit", "10M", "--command", sql]); - cmd.args(top_memory_consumers); + cmd.arg("-q") + .args(["--memory-limit", "10M"]) + .args(top_memory_consumers); - assert_cmd_snapshot!(cmd); + let input = "\ +\\memory_profiling +select * from generate_series(1,500000) as t1(v1) order by v1; +"; + + assert_cmd_snapshot!(cmd.pass_stdin(input)); +} + +#[test] +fn cli_memory_auto_report() { + let mut settings = make_settings(); + settings.set_snapshot_suffix("memory_auto_report"); + // Add common memory-related filters + add_memory_filters(&mut settings); + let _bound = settings.bind_to_scope(); + + let input = "\ + \\memory_profiling + select 1; + select * from generate_series(1,10000) as t1(v1) order by v1; + \\q + "; + + assert_cmd_snapshot!(cli().arg("-q").pass_stdin(input)); +} + +#[test] +fn cli_memory_disable_stops_report() { + let mut settings = make_settings(); + settings.set_snapshot_suffix("memory_disable_stops_report"); + // Add common memory-related filters + add_memory_filters(&mut settings); + let _bound = settings.bind_to_scope(); + + let input = "\ +\\memory_profiling +select 1; +\\memory_profiling +select 1; +"; + + assert_cmd_snapshot!(cli().arg("-q").pass_stdin(input)); } #[tokio::test] diff --git a/datafusion-cli/tests/snapshots/cli_memory_auto_report@memory_auto_report.snap b/datafusion-cli/tests/snapshots/cli_memory_auto_report@memory_auto_report.snap new file mode 100644 index 0000000000000..da68499b3c4db --- /dev/null +++ b/datafusion-cli/tests/snapshots/cli_memory_auto_report@memory_auto_report.snap @@ -0,0 +1,80 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +assertion_line: 267 +info: + program: datafusion-cli + args: + - "-q" + stdin: "\\memory_profiling\n select 1;\n select * from generate_series(1,10000) as t1(v1) order by v1;\n \\q\n " +--- +success: true +exit_code: 0 +----- stdout ----- +Memory profiling enabled ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ +Peak memory usage: XB +Cumulative allocations: XB +Memory usage by operator: +Other: XB + ++----+ +| v1 | ++----+ +| 1 | +| 2 | +| 3 | +| 4 | +| 5 | +| 6 | +| 7 | +| 8 | +| 9 | +| 10 | +| 11 | +| 12 | +| 13 | +| 14 | +| 15 | +| 16 | +| 17 | +| 18 | +| 19 | +| 20 | +| 21 | +| 22 | +| 23 | +| 24 | +| 25 | +| 26 | +| 27 | +| 28 | +| 29 | +| 30 | +| 31 | +| 32 | +| 33 | +| 34 | +| 35 | +| 36 | +| 37 | +| 38 | +| 39 | +| 40 | +| . | +| . | +| . | ++----+ +Peak memory usage: XB +Cumulative allocations: XB +Memory usage by operator: +Other: XB +Sorting: XB + +\q + +----- stderr ----- + diff --git a/datafusion-cli/tests/snapshots/cli_memory_disable_stops_report@memory_disable_stops_report.snap b/datafusion-cli/tests/snapshots/cli_memory_disable_stops_report@memory_disable_stops_report.snap new file mode 100644 index 0000000000000..190d8a0417dce --- /dev/null +++ b/datafusion-cli/tests/snapshots/cli_memory_disable_stops_report@memory_disable_stops_report.snap @@ -0,0 +1,31 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: + - "-q" + stdin: "\\memory_profiling\nselect 1;\n\\memory_profiling\nselect 1;\n" +--- +success: true +exit_code: 0 +----- stdout ----- +Memory profiling enabled ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ +Peak memory usage: XB +Cumulative allocations: XB +Memory usage by operator: +Other: XB + +Memory profiling disabled ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ +\q + +----- stderr ----- diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap index 89b646a531f8b..de35385daf5ff 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@no_track.snap @@ -3,19 +3,20 @@ source: datafusion-cli/tests/cli_integration.rs info: program: datafusion-cli args: + - "-q" - "--memory-limit" - 10M - - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" - "--top-memory-consumers" - "0" + stdin: "\\memory_profiling\nselect * from generate_series(1,500000) as t1(v1) order by v1;\n" --- -success: false -exit_code: 1 +success: true +exit_code: 0 ----- stdout ----- -[CLI_VERSION] -Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes -caused by -Resources exhausted: Failed to allocate +Memory profiling enabled +\q ----- stderr ----- +Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Failed to allocate diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index a49d7c9755f6e..bd7a193bb524c 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -3,22 +3,23 @@ source: datafusion-cli/tests/cli_integration.rs info: program: datafusion-cli args: + - "-q" - "--memory-limit" - 10M - - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" - "--top-memory-consumers" - "2" + stdin: "\\memory_profiling\nselect * from generate_series(1,500000) as t1(v1) order by v1;\n" --- -success: false -exit_code: 1 +success: true +exit_code: 0 ----- stdout ----- -[CLI_VERSION] -Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes -caused by -Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - Consumer(can spill: bool) consumed XB, peak XB, - Consumer(can spill: bool) consumed XB, peak XB. -Error: Failed to allocate +Memory profiling enabled +\q ----- stderr ----- +Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB. +Error: Failed to allocate diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 7416f5cf6bc5d..aa9e1dbed543d 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -3,21 +3,22 @@ source: datafusion-cli/tests/cli_integration.rs info: program: datafusion-cli args: + - "-q" - "--memory-limit" - 10M - - "--command" - - "select * from generate_series(1,500000) as t1(v1) order by v1;" + stdin: "\\memory_profiling\nselect * from generate_series(1,500000) as t1(v1) order by v1;\n" --- -success: false -exit_code: 1 +success: true +exit_code: 0 ----- stdout ----- -[CLI_VERSION] -Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes -caused by -Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - Consumer(can spill: bool) consumed XB, peak XB, - Consumer(can spill: bool) consumed XB, peak XB, - Consumer(can spill: bool) consumed XB, peak XB. -Error: Failed to allocate +Memory profiling enabled +\q ----- stderr ----- +Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes +caused by +Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB, + Consumer(can spill: bool) consumed XB. +Error: Failed to allocate diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 68bb5376a1acc..ddf8c7885aa45 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -32,6 +32,9 @@ rust-version = { workspace = true } [lints] workspace = true +[dependencies] +datafusion-common = { workspace = true } + [[example]] name = "flight_sql_server" path = "examples/flight/flight_sql_server.rs" diff --git a/datafusion-examples/examples/memory_profiling.rs b/datafusion-examples/examples/memory_profiling.rs new file mode 100644 index 0000000000000..8228a463ad91f --- /dev/null +++ b/datafusion-examples/examples/memory_profiling.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Demonstrates how to track and report memory usage of a query. +//! +//! This example enables per-consumer memory tracking while running a query. +//! It collects detailed metrics for each memory consumer (for example, operators +//! or execution stages), including peak memory and cumulative allocation bytes, +//! and then prints a short summary that aggregates metrics by operator +//! category. +//! +//! Run with `cargo run --example memory_profiling`. + +use std::{collections::BTreeMap, fmt::Write, num::NonZeroUsize, sync::Arc}; + +use datafusion::execution::memory_pool::{ + human_readable_size, operator_category, ConsumerMemoryMetrics, GreedyMemoryPool, + TrackConsumersPool, +}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::prelude::*; + +/// Format summary of memory usage metrics. +fn format_metrics(metrics: &[ConsumerMemoryMetrics]) -> String { + if metrics.is_empty() { + return "no memory metrics recorded".to_string(); + } + + let peak = metrics.iter().map(|m| m.peak).max().unwrap_or(0); + let cumulative: usize = metrics.iter().map(|m| m.cumulative).sum(); + + let mut s = String::new(); + let _ = writeln!(s, "Peak memory usage: {}", human_readable_size(peak)); + let _ = writeln!( + s, + "Cumulative allocations: {}", + human_readable_size(cumulative) + ); + + let mut by_op: BTreeMap<&str, usize> = BTreeMap::new(); + for m in metrics { + let category = operator_category(&m.name); + *by_op.entry(category).or_default() += m.cumulative; + } + + let _ = writeln!(s, "Memory usage by operator:"); + for (op, bytes) in by_op { + let _ = writeln!(s, "{op}: {}", human_readable_size(bytes)); + } + s +} + +#[tokio::main] +async fn main() -> datafusion::error::Result<()> { + // Create a session context with a tracked memory pool + let pool = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(usize::MAX), + NonZeroUsize::new(5).unwrap(), + )); + pool.enable_tracking(); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(pool.clone() as Arc<_>) + .build_arc()?; + let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), runtime); + + let sql = "SELECT v % 100 AS group_key, COUNT(*) AS cnt, SUM(v) AS sum_v \ + FROM generate_series(1,100000) AS t(v) \ + GROUP BY group_key \ + ORDER BY group_key"; + + // Execute the query; collecting results forces execution + let df = ctx.sql(sql).await?; + df.collect().await?; + + // Gather metrics and disable tracking + let metrics = pool.consumer_metrics(); + pool.disable_tracking(); + + // Print memory usage summary + println!("{}", format_metrics(&metrics)); + + Ok(()) +} diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index e620b23267962..b8f19916baf2d 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -20,7 +20,7 @@ use datafusion_common::{internal_err, Result}; use std::hash::{Hash, Hasher}; -use std::{cmp::Ordering, sync::atomic, sync::Arc}; +use std::{cmp::Ordering, fmt, sync::atomic, sync::Arc}; mod pool; pub mod proxy { @@ -78,7 +78,7 @@ pub use pool::*; /// /// Scenario 1: /// For `Filter` operator, `RecordBatch`es will stream through it, so it -/// don't have to keep track of memory usage through [`MemoryPool`]. +/// doesn't have to keep track of memory usage through [`MemoryPool`]. /// /// Scenario 2: /// For `CrossJoin` operator, if the input size gets larger, the intermediate @@ -176,7 +176,7 @@ pub use pool::*; /// /// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers, /// providing better error messages on the largest memory users. -pub trait MemoryPool: Send + Sync + std::fmt::Debug { +pub trait MemoryPool: Send + Sync + fmt::Debug { /// Registers a new [`MemoryConsumer`] /// /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory @@ -223,6 +223,37 @@ pub enum MemoryLimit { Unknown, } +/// Implement MemoryPool for `Arc` +impl MemoryPool for Arc { + fn register(&self, consumer: &MemoryConsumer) { + (**self).register(consumer) + } + + fn unregister(&self, consumer: &MemoryConsumer) { + (**self).unregister(consumer) + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + (**self).grow(reservation, additional) + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + (**self).shrink(reservation, shrink) + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + (**self).try_grow(reservation, additional) + } + + fn reserved(&self) -> usize { + (**self).reserved() + } + + fn memory_limit(&self) -> MemoryLimit { + (**self).memory_limit() + } +} + /// A memory consumer is a named allocation traced by a particular /// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to /// a particular `MemoryConsumer`; @@ -322,6 +353,7 @@ impl MemoryConsumer { consumer: self, }), size: 0, + peak: 0, } } } @@ -351,6 +383,7 @@ impl Drop for SharedRegistration { pub struct MemoryReservation { registration: Arc, size: usize, + peak: usize, } impl MemoryReservation { @@ -358,6 +391,10 @@ impl MemoryReservation { pub fn size(&self) -> usize { self.size } + /// Returns the peak size of this reservation in bytes + pub fn peak(&self) -> usize { + self.peak + } /// Returns [MemoryConsumer] for this [MemoryReservation] pub fn consumer(&self) -> &MemoryConsumer { @@ -409,6 +446,9 @@ impl MemoryReservation { Ordering::Less => self.shrink(self.size - capacity), _ => {} } + if self.size > self.peak { + self.peak = self.size; + } } /// Try to set the size of this reservation to `capacity` @@ -418,6 +458,9 @@ impl MemoryReservation { Ordering::Less => self.shrink(self.size - capacity), _ => {} }; + if self.size > self.peak { + self.peak = self.size; + } Ok(()) } @@ -425,6 +468,9 @@ impl MemoryReservation { pub fn grow(&mut self, capacity: usize) { self.registration.pool.grow(self, capacity); self.size += capacity; + if self.size > self.peak { + self.peak = self.size; + } } /// Try to increase the size of this reservation by `capacity` @@ -433,6 +479,9 @@ impl MemoryReservation { pub fn try_grow(&mut self, capacity: usize) -> Result<()> { self.registration.pool.try_grow(self, capacity)?; self.size += capacity; + if self.size > self.peak { + self.peak = self.size; + } Ok(()) } @@ -451,6 +500,7 @@ impl MemoryReservation { Self { size: capacity, registration: Arc::clone(&self.registration), + peak: capacity, } } @@ -459,6 +509,7 @@ impl MemoryReservation { Self { size: 0, registration: Arc::clone(&self.registration), + peak: 0, } } @@ -475,6 +526,19 @@ impl Drop for MemoryReservation { } } +impl fmt::Display for MemoryReservation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}#{} reserved {} (peak {})", + self.consumer().name(), + self.consumer().id(), + human_readable_size(self.size()), + human_readable_size(self.peak()) + ) + } +} + pub mod units { pub const TB: u64 = 1 << 40; pub const GB: u64 = 1 << 30; @@ -503,6 +567,43 @@ pub fn human_readable_size(size: usize) -> String { format!("{value:.1} {unit}") } +/// Categorize operator names into high-level groups for reporting. +const OPERATOR_CATEGORIES: &[(&str, &str)] = &[ + ("parquet", "Parquet"), + ("csv", "CSV"), + ("json", "JSON"), + ("coalesce", "Coalesce"), + ("repart", "Repartition"), + ("shuffle", "Shuffle"), + ("exchange", "Network Shuffle"), + ("scan", "Data Input"), + ("filter", "Filtering"), + ("join", "Join Operation"), + ("nested_loop", "Nested Loop Join"), + ("sort_merge", "Sort Merge Join"), + ("hash", "Hash Aggregate"), + ("aggregate", "Aggregation"), + ("sort", "Sorting"), + ("project", "Projection"), + ("union", "Set Operation"), + ("window", "Window Function"), + ("limit", "Limit/TopK"), + ("top", "Limit/TopK"), + ("distinct", "Distinct"), + ("spill", "Memory Management"), +]; + +/// Return a human-friendly category for an operator name. +pub fn operator_category(name: &str) -> &'static str { + let name = name.to_lowercase(); + for (pat, cat) in OPERATOR_CATEGORIES { + if name.contains(pat) { + return cat; + } + } + "Other" +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index da456b7071f77..71b28b7a0baf5 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -24,7 +24,7 @@ use log::debug; use parking_lot::Mutex; use std::{ num::NonZeroUsize, - sync::atomic::{AtomicUsize, Ordering}, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, }; /// A [`MemoryPool`] that enforces no limit @@ -268,7 +268,11 @@ fn insufficient_capacity_err( struct TrackedConsumer { name: String, can_spill: bool, + /// Currently reserved bytes for this consumer reserved: AtomicUsize, + /// Total bytes ever allocated by this consumer + cumulative: AtomicUsize, + /// Peak reserved bytes for this consumer peak: AtomicUsize, } @@ -278,16 +282,13 @@ impl TrackedConsumer { self.reserved.load(Ordering::Relaxed) } - /// Return the peak value - fn peak(&self) -> usize { - self.peak.load(Ordering::Relaxed) - } - /// Grows the tracked consumer's reserved size, /// should be called after the pool has successfully performed the grow(). fn grow(&self, additional: usize) { - self.reserved.fetch_add(additional, Ordering::Relaxed); - self.peak.fetch_max(self.reserved(), Ordering::Relaxed); + let new_reserved = + self.reserved.fetch_add(additional, Ordering::Relaxed) + additional; + self.cumulative.fetch_add(additional, Ordering::Relaxed); + self.peak.fetch_max(new_reserved, Ordering::Relaxed); } /// Reduce the tracked consumer's reserved size, @@ -297,6 +298,32 @@ impl TrackedConsumer { } } +/// Snapshot of tracked memory metrics for a [`MemoryConsumer`] +#[derive(Debug, Clone)] +pub struct ConsumerMemoryMetrics { + pub id: usize, + pub name: String, + pub can_spill: bool, + pub reserved: usize, + pub cumulative: usize, + pub peak: usize, +} + +/// Trait for memory pools that support tracking memory consumers +pub trait TrackedPool: Send + Sync { + /// Enable tracking and reset any existing metrics + fn enable_tracking(&self); + + /// Disable tracking of consumers + fn disable_tracking(&self); + + /// Return true if tracking is enabled + fn tracking_enabled(&self) -> bool; + + /// Returns a snapshot of the metrics for all tracked consumers + fn consumer_metrics(&self) -> Vec; +} + /// A [`MemoryPool`] that tracks the consumers that have /// reserved memory within the inner memory pool. /// @@ -329,6 +356,8 @@ pub struct TrackConsumersPool { top: NonZeroUsize, /// Maps consumer_id --> TrackedConsumer tracked_consumers: Mutex>, + /// Whether tracking is enabled + tracking_enabled: AtomicBool, } impl TrackConsumersPool { @@ -371,9 +400,37 @@ impl TrackConsumersPool { inner, top, tracked_consumers: Default::default(), + tracking_enabled: AtomicBool::new(true), } } + /// Enable tracking and reset any existing metrics + pub fn enable_tracking(&self) { + self.tracking_enabled.store(true, Ordering::Relaxed); + self.tracked_consumers.lock().clear(); + } + + /// Disable tracking of consumers + pub fn disable_tracking(&self) { + self.tracking_enabled.store(false, Ordering::Relaxed); + } + + /// Returns a snapshot of the metrics for all tracked consumers + pub fn consumer_metrics(&self) -> Vec { + self.tracked_consumers + .lock() + .iter() + .map(|(id, consumer)| ConsumerMemoryMetrics { + id: *id, + name: consumer.name.clone(), + can_spill: consumer.can_spill, + reserved: consumer.reserved.load(Ordering::Relaxed), + cumulative: consumer.cumulative.load(Ordering::Relaxed), + peak: consumer.peak.load(Ordering::Relaxed), + }) + .collect() + } + /// Returns a formatted string with the top memory consumers. pub fn report_top(&self, top: usize) -> String { let mut consumers = self @@ -386,7 +443,6 @@ impl TrackConsumersPool { *consumer_id, tracked_consumer.name.to_owned(), tracked_consumer.can_spill, - tracked_consumer.peak(), ), tracked_consumer.reserved(), ) @@ -396,63 +452,100 @@ impl TrackConsumersPool { consumers[0..std::cmp::min(top, consumers.len())] .iter() - .map(|((id, name, can_spill, peak), size)| { + .map(|((id, name, can_spill), size)| { format!( - " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}", - human_readable_size(*size), - human_readable_size(*peak), + " {name}#{id}(can spill: {can_spill}) consumed {}", + human_readable_size(*size) ) }) .collect::>() .join(",\n") + "." } + + /// Return true if tracking is currently enabled + pub fn tracking_enabled(&self) -> bool { + self.tracking_enabled.load(Ordering::Relaxed) + } +} + +impl TrackedPool for TrackConsumersPool { + fn enable_tracking(&self) { + TrackConsumersPool::enable_tracking(self); + } + + fn disable_tracking(&self) { + TrackConsumersPool::disable_tracking(self); + } + + fn tracking_enabled(&self) -> bool { + TrackConsumersPool::tracking_enabled(self) + } + + fn consumer_metrics(&self) -> Vec { + TrackConsumersPool::consumer_metrics(self) + } } impl MemoryPool for TrackConsumersPool { fn register(&self, consumer: &MemoryConsumer) { self.inner.register(consumer); - let mut guard = self.tracked_consumers.lock(); - let existing = guard.insert( - consumer.id(), - TrackedConsumer { - name: consumer.name().to_string(), - can_spill: consumer.can_spill(), - reserved: Default::default(), - peak: Default::default(), - }, - ); - - debug_assert!( - existing.is_none(), - "Registered was called twice on the same consumer" - ); + if self.tracking_enabled.load(Ordering::Relaxed) { + let mut guard = self.tracked_consumers.lock(); + let existing = guard.insert( + consumer.id(), + TrackedConsumer { + name: consumer.name().to_string(), + can_spill: consumer.can_spill(), + reserved: Default::default(), + cumulative: Default::default(), + peak: Default::default(), + }, + ); + + debug_assert!( + existing.is_none(), + "Registered was called twice on the same consumer", + ); + } } fn unregister(&self, consumer: &MemoryConsumer) { self.inner.unregister(consumer); - self.tracked_consumers.lock().remove(&consumer.id()); + if self.tracking_enabled.load(Ordering::Relaxed) { + let guard = self.tracked_consumers.lock(); + if let Some(tracked) = guard.get(&consumer.id()) { + let reserved = tracked.reserved(); + if reserved > 0 { + tracked.shrink(reserved); + } + } + } } fn grow(&self, reservation: &MemoryReservation, additional: usize) { self.inner.grow(reservation, additional); - self.tracked_consumers - .lock() - .entry(reservation.consumer().id()) - .and_modify(|tracked_consumer| { - tracked_consumer.grow(additional); - }); + if self.tracking_enabled.load(Ordering::Relaxed) { + self.tracked_consumers + .lock() + .entry(reservation.consumer().id()) + .and_modify(|tracked_consumer| { + tracked_consumer.grow(additional); + }); + } } fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { self.inner.shrink(reservation, shrink); - self.tracked_consumers - .lock() - .entry(reservation.consumer().id()) - .and_modify(|tracked_consumer| { - tracked_consumer.shrink(shrink); - }); + if self.tracking_enabled.load(Ordering::Relaxed) { + self.tracked_consumers + .lock() + .entry(reservation.consumer().id()) + .and_modify(|tracked_consumer| { + tracked_consumer.shrink(shrink); + }); + } } fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { @@ -471,12 +564,14 @@ impl MemoryPool for TrackConsumersPool { _ => e, })?; - self.tracked_consumers - .lock() - .entry(reservation.consumer().id()) - .and_modify(|tracked_consumer| { - tracked_consumer.grow(additional); - }); + if self.tracking_enabled.load(Ordering::Relaxed) { + self.tracked_consumers + .lock() + .entry(reservation.consumer().id()) + .and_modify(|tracked_consumer| { + tracked_consumer.grow(additional); + }); + } Ok(()) } @@ -488,7 +583,6 @@ impl MemoryPool for TrackConsumersPool { self.inner.memory_limit() } } - fn provide_top_memory_consumers_to_error_msg( error_msg: String, top_consumers: String, @@ -591,8 +685,7 @@ mod tests { // set r1=50, using grow and shrink let mut r1 = MemoryConsumer::new("r1").register(&pool); - r1.grow(50); - r1.grow(20); + r1.grow(70); r1.shrink(20); // set r2=15 using try_grow @@ -620,9 +713,9 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B, - r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B, - r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B. + r1#[ID](can spill: false) consumed 50.0 B, + r3#[ID](can spill: false) consumed 20.0 B, + r2#[ID](can spill: false) consumed 15.0 B. Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool "); } @@ -645,7 +738,7 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. + foo#[ID](can spill: false) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool "); @@ -662,8 +755,8 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, - foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B. + foo#[ID](can spill: false) consumed 10.0 B, + foo#[ID](can spill: false) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool "); @@ -675,8 +768,8 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, - foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + foo#[ID](can spill: false) consumed 20.0 B, + foo#[ID](can spill: false) consumed 10.0 B. Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool "); @@ -690,9 +783,9 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, - foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B, - foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B. + foo#[ID](can spill: false) consumed 20.0 B, + foo#[ID](can spill: false) consumed 10.0 B, + foo#[ID](can spill: true) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool "); } @@ -714,20 +807,21 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B, - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + r1#[ID](can spill: false) consumed 20.0 B, + r0#[ID](can spill: false) consumed 10.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool ")); // Test: unregister one - // only the remaining one should be listed + // the unregistered consumer remains with 0 usage drop(r1); let res = r0.try_grow(150); assert!(res.is_err()); let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + r0#[ID](can spill: false) consumed 10.0 B, + r1#[ID](can spill: false) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); @@ -738,7 +832,8 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + r0#[ID](can spill: false) consumed 10.0 B, + r1#[ID](can spill: false) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); @@ -749,7 +844,8 @@ mod tests { let error = res.unwrap_err().strip_backtrace(); allow_duplicates!(assert_snapshot!(error, @r" Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: - r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B. + r0#[ID](can spill: false) consumed 10.0 B, + r1#[ID](can spill: false) consumed 0.0 B. Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool ")); } @@ -796,8 +892,8 @@ mod tests { // Test: can get runtime metrics, even without an error thrown let res = downcasted.report_top(2); assert_snapshot!(res, @r" - r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B, - r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B. + r3#[ID](can spill: false) consumed 45.0 B, + r1#[ID](can spill: false) consumed 20.0 B. "); } } diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 263728f5b04d4..37cb8f59ead0f 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -58,7 +58,7 @@ OPTIONS: Specify the memory pool type 'greedy' or 'fair', default to 'greedy' --top-memory-consumers - The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0 [default: 3] + The number of top memory consumers to display when query fails due to memory exhaustion. To disable memory consumer tracking, set this value to 0 [default: 5] -d, --disk-limit Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g') @@ -122,6 +122,36 @@ Available commands inside DataFusion CLI are: > \h function ``` +- Memory profiling + +> **Tip:** Memory profiling requires the tracked pool. Start the CLI with `--top-memory-consumers N` (N≥1), or profiling will report no metrics. By default, the CLI starts with `--top-memory-consumers 5`. + +Memory profiling is disabled by default. Run `\memory_profiling` to enable it; a usage report will print automatically after each subsequent query. Run `\memory_profiling` again to disable profiling. + +Example usage: + +```text +> \memory_profiling +Memory profiling enabled +> SELECT v % 100 AS group_key, COUNT(*) AS cnt, SUM(v) AS sum_v FROM generate_series(1,100000) AS t(v) GROUP BY group_key ORDER BY group_key; + ++-----------+------+----------+ +| group_key | cnt | sum_v | ++-----------+------+----------+ +| 0 | 1000 | 50050000 | +| 1 | 1000 | 49951000 | +| 2 | 1000 | 49952000 | +... +Peak memory usage: 10.0 MB +Cumulative allocations: 101.6 MB +Memory usage by operator: +Aggregation: 762.2 KB +Repartition: 884.8 KB +Sorting: 100.0 MB + +\memory_profiling # disable +``` + ## Supported SQL In addition to the normal [SQL supported in DataFusion], `datafusion-cli` also