Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ message LogicalPlanNode {
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
AnalyzeNode analyze = 14;
ExplainNode explain = 12;
WindowNode window = 13;
}
Expand Down Expand Up @@ -323,6 +324,11 @@ enum FileType{
CSV = 2;
}

message AnalyzeNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message ExplainNode{
LogicalPlanNode input = 1;
bool verbose = 2;
Expand Down
9 changes: 8 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,17 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
has_header: create_extern_table.has_header,
})
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan = convert_box_required!(analyze.input)?;
LogicalPlanBuilder::from(input)
.explain(analyze.verbose, true)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan = convert_box_required!(explain.input)?;
LogicalPlanBuilder::from(input)
.explain(explain.verbose)?
.explain(explain.verbose, false)?
.build()
.map_err(|e| e.into())
}
Expand Down
41 changes: 39 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,43 @@ mod roundtrip_tests {
Ok(())
}

#[test]
fn roundtrip_analyze() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, false),
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::Int32, false),
]);

let verbose_plan = LogicalPlanBuilder::scan_csv(
"employee.csv",
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(true, true))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

let plan = LogicalPlanBuilder::scan_csv(
"employee.csv",
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(false, true))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

roundtrip_test!(plan);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we talk I need to learn the the effectiveness of this round trip test thats convert a logical/physical plan into photo and back. The tests look simple and easy to understand this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I simply copy/pasted the test for roundtrip_explain below -- I agree the pattern is quite nice


roundtrip_test!(verbose_plan);

Ok(())
}

#[test]
fn roundtrip_explain() -> Result<()> {
let schema = Schema::new(vec![
Expand All @@ -677,7 +714,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(true))
.and_then(|plan| plan.explain(true, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

Expand All @@ -687,7 +724,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(false))
.and_then(|plan| plan.explain(false, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

Expand Down
11 changes: 11 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
LogicalPlan::Analyze { verbose, input, .. } => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
protobuf::AnalyzeNode {
input: Some(Box::new(input)),
verbose: *verbose,
},
))),
})
}
LogicalPlan::Explain { verbose, plan, .. } => {
let input: protobuf::LogicalPlanNode = plan.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,20 @@ pub trait DataFrame: Send + Sync {

/// Return a DataFrame with the explanation of its plan so far.
///
/// if `analyze` is specified, runs the plan and reports metrics
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.limit(100)?.explain(false)?.collect().await?;
/// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>>;
fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn DataFrame>>;

/// Return a `FunctionRegistry` used to plan udf's calls
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ mod tests {

let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None)
.unwrap()
.explain(true)
.explain(true, false)
.unwrap()
.build()
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ impl DataFrame for DataFrameImpl {
self.plan.schema()
}

fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.explain(verbose)?
.explain(verbose, analyze)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
Expand Down Expand Up @@ -318,7 +318,7 @@ mod tests {
let df = df
.select_columns(&["c1", "c2", "c11"])?
.limit(10)?
.explain(false)?;
.explain(false, false)?;
let plan = df.to_logical_plan();

// build query using SQL
Expand Down
36 changes: 25 additions & 11 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,32 @@ impl LogicalPlanBuilder {
}

/// Create an expression to represent the explanation of the plan
pub fn explain(&self, verbose: bool) -> Result<Self> {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

///
/// if `analyze` is true, runs the actual plan and produces
/// information about metrics during run.
///
/// if `verbose` is true, prints out additional details.
pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Self> {
let schema = LogicalPlan::explain_schema();

Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
schema: schema.to_dfschema_ref()?,
}))
let schema = schema.to_dfschema_ref()?;

if analyze {
Ok(Self::from(LogicalPlan::Analyze {
verbose,
input: Arc::new(self.plan.clone()),
schema,
}))
} else {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
schema,
}))
}
}

/// Build the plan
Expand Down
16 changes: 16 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ pub enum LogicalPlan {
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I chose a new LogicalPlan node because the implementation for ANALYZE is so different than EXPLAIN. However, it would be possible to re-use the same LogicalPlan node if people prefer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am about to ask when I saw the above code that has analyze as different function. I am worry about future inconsistency and headache of keeping them consistent, as well as redundant work when we change or improve something. I would prefer to keep them in the same LogicalPlan

/// Should extra detail be included?
verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Extension operator defined outside of DataFusion
Extension {
/// The runtime extension operator
Expand All @@ -239,6 +249,7 @@ impl LogicalPlan {
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => schema,
LogicalPlan::Explain { schema, .. } => schema,
LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
}
Expand Down Expand Up @@ -278,6 +289,7 @@ impl LogicalPlan {
}
LogicalPlan::Extension { node } => vec![node.schema()],
LogicalPlan::Explain { schema, .. }
| LogicalPlan::Analyze { schema, .. }
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
LogicalPlan::Limit { input, .. }
Expand Down Expand Up @@ -327,6 +339,7 @@ impl LogicalPlan {
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
Expand All @@ -350,6 +363,7 @@ impl LogicalPlan {
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -495,6 +509,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?,
LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -790,6 +805,7 @@ impl LogicalPlan {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
LogicalPlan::Extension { ref node } => node.fmt_for_explain(f),
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// push the optimization to the plan of this explain
push_down(&state, plan)
}
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter { input, predicate } => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
// we cannot predict how rows will be repartitioned
None
}
LogicalPlan::Analyze { .. } => {
// Analyze produces one row, verbose produces more
// but it should never be used as an input to a Join anyways
None
}
// the following operators are special cases and not querying data
LogicalPlan::CreateExternalTable { .. } => None,
LogicalPlan::Explain { .. } => None,
Expand Down Expand Up @@ -201,6 +206,7 @@ impl OptimizerRule for HashBuildProbeOrder {
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
Expand Down
25 changes: 25 additions & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,31 @@ fn optimize_plan(
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze {
input,
verbose,
schema,
} => {
// make sure we keep all the columns from the input plan
let required_columns = input
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();

Ok(LogicalPlan::Analyze {
input: Arc::new(optimize_plan(
optimizer,
input,
&required_columns,
false,
execution_props,
)?),
verbose: *verbose,
schema: schema.clone(),
})
}
LogicalPlan::Union {
inputs,
schema,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ pub fn from_plan(
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. } => Ok(plan.clone()),
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. } => Ok(plan.clone()),
}
}

Expand Down
Loading