Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
469 changes: 469 additions & 0 deletions datafusion/common/src/format.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ impl SessionState {
stringified_plans,
schema: Arc::clone(&e.schema),
logical_optimization_succeeded: false,
show_statistics: e.show_statistics,
}));
}
Err(e) => return Err(e),
Expand Down Expand Up @@ -719,6 +720,7 @@ impl SessionState {
stringified_plans,
schema: Arc::clone(&e.schema),
logical_optimization_succeeded,
show_statistics: e.show_statistics,
}))
} else {
let analyzed_plan = self.analyzer.execute_and_check(
Expand Down
30 changes: 19 additions & 11 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,8 @@ impl DefaultPhysicalPlanner {

let config = &session_state.config_options().explain;
let explain_format = &e.explain_format;
// Statement-level override wins over session config for show_statistics.
let show_statistics = e.show_statistics.unwrap_or(config.show_statistics);

if !e.logical_optimization_succeeded {
return Ok(Arc::new(ExplainExec::new(
Expand Down Expand Up @@ -2600,7 +2602,7 @@ impl DefaultPhysicalPlanner {
stringified_plans.push(StringifiedPlan::new(
InitialPhysicalPlan,
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_statistics(show_statistics)
.set_show_schema(config.show_schema)
.indent(e.verbose)
.to_string(),
Expand All @@ -2609,7 +2611,7 @@ impl DefaultPhysicalPlanner {
// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
if !show_statistics {
stringified_plans.push(StringifiedPlan::new(
InitialPhysicalPlanWithStats,
displayable(input.as_ref())
Expand Down Expand Up @@ -2638,7 +2640,7 @@ impl DefaultPhysicalPlanner {
stringified_plans.push(StringifiedPlan::new(
plan_type,
displayable(plan)
.set_show_statistics(config.show_statistics)
.set_show_statistics(show_statistics)
.set_show_schema(config.show_schema)
.indent(e.verbose)
.to_string(),
Expand All @@ -2651,7 +2653,7 @@ impl DefaultPhysicalPlanner {
stringified_plans.push(StringifiedPlan::new(
FinalPhysicalPlan,
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_statistics(show_statistics)
.set_show_schema(config.show_schema)
.indent(e.verbose)
.to_string(),
Expand All @@ -2660,7 +2662,7 @@ impl DefaultPhysicalPlanner {
// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
if !show_statistics {
stringified_plans.push(StringifiedPlan::new(
FinalPhysicalPlanWithStats,
displayable(input.as_ref())
Expand Down Expand Up @@ -2714,13 +2716,18 @@ impl DefaultPhysicalPlanner {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = Arc::clone(a.schema.inner());
let show_statistics = session_state.config_options().explain.show_statistics;
let analyze_level = session_state.config_options().explain.analyze_level;
// Statement-level overrides take precedence over the session config.
let analyze_level = a
.analyze_level
.unwrap_or(session_state.config_options().explain.analyze_level);
let metric_types = analyze_level.included_types();
let analyze_categories = session_state
.config_options()
.explain
.analyze_categories
.clone();
let analyze_categories = a.analyze_categories.clone().unwrap_or_else(|| {
session_state
.config_options()
.explain
.analyze_categories
.clone()
});
let metric_categories = match analyze_categories {
ExplainAnalyzeCategories::All => None,
ExplainAnalyzeCategories::Only(cats) => Some(cats),
Expand Down Expand Up @@ -3844,6 +3851,7 @@ mod tests {
stringified_plans,
schema: schema.to_dfschema_ref().unwrap(),
logical_optimization_succeeded: false,
show_statistics: None,
};
let plan = planner
.handle_explain(&explain, &ctx.state())
Expand Down
112 changes: 112 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1267,3 +1267,115 @@ async fn explain_analyze_categories() {
);
}
}

/// Returns a [`SessionContext`] configured with the PostgreSQL dialect so
/// that `EXPLAIN (option, ...)` utility-option syntax is accepted.
fn session_ctx_with_pg_dialect() -> SessionContext {
use std::str::FromStr;
let mut config = SessionConfig::new();
let options = config.options_mut();
options.sql_parser.dialect =
datafusion::config::Dialect::from_str("PostgreSQL").unwrap();
SessionContext::new_with_config(config)
}

async fn collect_explain(ctx: &SessionContext, sql: &str) -> String {
let dataframe = ctx.sql(sql).await.unwrap();
let batches = dataframe.collect().await.unwrap();
arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

/// Verifies that the Postgres-style `EXPLAIN (METRICS '...')` form produces
/// the same category filtering as `SET datafusion.explain.analyze_categories`.
#[tokio::test]
async fn explain_analyze_paren_metrics_filtering() {
let ctx = session_ctx_with_pg_dialect();
let sql = "EXPLAIN (ANALYZE, METRICS 'rows') \
SELECT * FROM generate_series(10) as t1(v1) ORDER BY v1 DESC";
let plan = collect_explain(&ctx, sql).await;
assert!(
plan.contains("output_rows"),
"rows category should include output_rows:\n{plan}"
);
assert!(
!plan.contains("elapsed_compute"),
"rows-only METRICS should exclude elapsed_compute:\n{plan}"
);
assert!(
!plan.contains("output_bytes"),
"rows-only METRICS should exclude output_bytes:\n{plan}"
);
}

/// Verifies that a statement-level METRICS overrides the session config.
#[tokio::test]
async fn explain_analyze_paren_metrics_overrides_session_config() {
let ctx = session_ctx_with_pg_dialect();
// Session default: show only `rows` via config.
{
let state = ctx.state_ref();
let mut state = state.write();
state.config_mut().options_mut().explain.analyze_categories =
ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]);
}
// Statement overrides with 'bytes' — we should see output_bytes but not
// output_rows (except row-count metrics with the `output_bytes` substring
// are avoided because the metric names are distinct).
let sql = "EXPLAIN (ANALYZE, METRICS 'bytes') \
SELECT * FROM generate_series(10) as t1(v1) ORDER BY v1 DESC";
let plan = collect_explain(&ctx, sql).await;
assert!(
plan.contains("output_bytes"),
"statement-level METRICS='bytes' should show output_bytes:\n{plan}"
);
assert!(
!plan.contains("output_rows"),
"statement-level METRICS='bytes' should hide output_rows:\n{plan}"
);
}

/// Verifies that `EXPLAIN (ANALYZE, LEVEL summary)` only shows summary metrics,
/// overriding the session default of `dev`.
#[tokio::test]
async fn explain_analyze_paren_level_overrides_session_config() {
let ctx = session_ctx_with_pg_dialect();
// Session default: Dev
{
let state = ctx.state_ref();
let mut state = state.write();
state.config_mut().options_mut().explain.analyze_level = MetricType::Dev;
}
let sql = "EXPLAIN (ANALYZE, LEVEL summary) \
SELECT * FROM generate_series(10) as t1(v1) ORDER BY v1 DESC";
let plan = collect_explain(&ctx, sql).await;
// `spill_count` is Dev-only; `output_rows` is Summary.
assert!(
plan.contains("output_rows"),
"summary should still show output_rows:\n{plan}"
);
assert!(
!plan.contains("spill_count"),
"summary should hide Dev-only spill_count:\n{plan}"
);
}

/// Verifies that `EXPLAIN (ANALYZE, BUFFERS)` returns a helpful error.
#[tokio::test]
async fn explain_paren_buffers_rejected() {
let ctx = session_ctx_with_pg_dialect();
let err = ctx
.sql("EXPLAIN (ANALYZE, BUFFERS) SELECT 1")
.await
.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("BUFFERS"),
"error should mention BUFFERS: {msg}"
);
assert!(
msg.contains("not supported"),
"error should say not supported: {msg}"
);
}
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ impl LogicalPlanBuilder {
verbose: explain_option.verbose,
input: self.plan,
schema,
analyze_level: None,
analyze_categories: None,
})))
} else {
let stringified_plans =
Expand All @@ -1341,6 +1343,7 @@ impl LogicalPlanBuilder {
stringified_plans,
schema,
logical_optimization_succeeded: false,
show_statistics: explain_option.show_statistics,
})))
}
}
Expand Down
34 changes: 32 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::{

use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use datafusion_common::cse::{NormalizeEq, Normalizeable};
use datafusion_common::format::ExplainFormat;
use datafusion_common::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
use datafusion_common::metadata::check_metadata_with_storage_equal;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
Expand Down Expand Up @@ -1094,6 +1094,8 @@ impl LogicalPlan {
verbose: a.verbose,
schema: Arc::clone(&a.schema),
input: Arc::new(input),
analyze_level: a.analyze_level,
analyze_categories: a.analyze_categories.clone(),
}))
}
LogicalPlan::Explain(e) => {
Expand All @@ -1106,6 +1108,7 @@ impl LogicalPlan {
stringified_plans: e.stringified_plans.clone(),
schema: Arc::clone(&e.schema),
logical_optimization_succeeded: e.logical_optimization_succeeded,
show_statistics: e.show_statistics,
}))
}
LogicalPlan::Statement(Statement::Prepare(Prepare {
Expand Down Expand Up @@ -3205,6 +3208,9 @@ pub struct ExplainOption {
pub analyze: bool,
/// Output syntax/format
pub format: ExplainFormat,
/// Statement-level override for `datafusion.explain.show_statistics`.
/// `None` means "fall back to session config".
pub show_statistics: Option<bool>,
}

impl Default for ExplainOption {
Expand All @@ -3213,6 +3219,7 @@ impl Default for ExplainOption {
verbose: false,
analyze: false,
format: ExplainFormat::Indent,
show_statistics: None,
}
}
}
Expand All @@ -3235,6 +3242,13 @@ impl ExplainOption {
self.format = format;
self
}

/// Builder-style setter for a statement-level override of
/// `datafusion.explain.show_statistics`.
pub fn with_show_statistics(mut self, show_statistics: Option<bool>) -> Self {
self.show_statistics = show_statistics;
self
}
}

/// Produces a relation with string representations of
Expand All @@ -3258,6 +3272,9 @@ pub struct Explain {
pub schema: DFSchemaRef,
/// Used by physical planner to check if should proceed with planning
pub logical_optimization_succeeded: bool,
/// Statement-level override for `datafusion.explain.show_statistics`.
/// When `None`, the session-config value is used.
pub show_statistics: Option<bool>,
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
Expand All @@ -3273,18 +3290,22 @@ impl PartialOrd for Explain {
pub stringified_plans: &'a Vec<StringifiedPlan>,
/// Used by physical planner to check if should proceed with planning
pub logical_optimization_succeeded: &'a bool,
/// Statement-level override for show_statistics
pub show_statistics: &'a Option<bool>,
}
let comparable_self = ComparableExplain {
verbose: &self.verbose,
plan: &self.plan,
stringified_plans: &self.stringified_plans,
logical_optimization_succeeded: &self.logical_optimization_succeeded,
show_statistics: &self.show_statistics,
};
let comparable_other = ComparableExplain {
verbose: &other.verbose,
plan: &other.plan,
stringified_plans: &other.stringified_plans,
logical_optimization_succeeded: &other.logical_optimization_succeeded,
show_statistics: &other.show_statistics,
};
comparable_self
.partial_cmp(&comparable_other)
Expand All @@ -3303,9 +3324,18 @@ pub struct Analyze {
pub input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
/// Statement-level override for `datafusion.explain.analyze_level`.
/// When `None`, the session-config value is used.
pub analyze_level: Option<MetricType>,
/// Statement-level override for `datafusion.explain.analyze_categories`.
/// When `None`, the session-config value is used.
pub analyze_categories: Option<ExplainAnalyzeCategories>,
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
// Manual implementation needed because of `schema` field and the lack of
// `PartialOrd` on `MetricType` / `ExplainAnalyzeCategories`. Ordering is
// defined over `(verbose, input)` and then falls back to `==` for the
// remaining statement-level override fields.
impl PartialOrd for Analyze {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.verbose.partial_cmp(&other.verbose) {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl TreeNode for LogicalPlan {
stringified_plans,
schema,
logical_optimization_succeeded,
show_statistics,
}) => plan.map_elements(f)?.update_data(|plan| {
LogicalPlan::Explain(Explain {
verbose,
Expand All @@ -211,17 +212,22 @@ impl TreeNode for LogicalPlan {
stringified_plans,
schema,
logical_optimization_succeeded,
show_statistics,
})
}),
LogicalPlan::Analyze(Analyze {
verbose,
input,
schema,
analyze_level,
analyze_categories,
}) => input.map_elements(f)?.update_data(|input| {
LogicalPlan::Analyze(Analyze {
verbose,
input,
schema,
analyze_level,
analyze_categories,
})
}),
LogicalPlan::Dml(DmlStatement {
Expand Down
Loading
Loading