Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.optimizer.skip_failed_rules"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for it being a config setting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I love this too! There are a few places I would want our product to fail in case of an error here but others I would not. so the configuration is great

pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
"datafusion.optimizer.skip_failed_rules";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -156,11 +160,18 @@ impl BuiltInConfigs {
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
ConfigDefinition::new_bool(
OPT_OPTIMIZER_SKIP_FAILED_RULES,
"When set to true, the logical plan optimizer will produce warning \
messages if any optimization rules produce errors and then proceed to the next \
rule. When set to false, any rules that produce errors will cause the query to fail.",
true
)],
}
}

/// Generate documentation that can be included int he user guide
/// Generate documentation that can be included in the user guide
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let configs = Self::new();
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
Expand Down Expand Up @@ -1371,7 +1371,11 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let mut optimizer_config = OptimizerConfig::new();
let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules(
self.config
.config_options
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
);
optimizer_config.query_execution_start_time =
self.execution_props.query_execution_start_time;

Expand Down
94 changes: 89 additions & 5 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use chrono::{DateTime, Utc};
use datafusion_common::Result;
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace};
use log::{debug, trace, warn};
use std::sync::Arc;

/// `OptimizerRule` transforms one ['LogicalPlan'] into another which
Expand All @@ -45,6 +45,8 @@ pub struct OptimizerConfig {
/// to use a literal value instead
pub query_execution_start_time: DateTime<Utc>,
next_id: usize,
/// Option to skip rules that produce errors
skip_failing_rules: bool,
}

impl OptimizerConfig {
Expand All @@ -53,9 +55,16 @@ impl OptimizerConfig {
Self {
query_execution_start_time: chrono::Utc::now(),
next_id: 0, // useful for generating things like unique subquery aliases
skip_failing_rules: true,
}
}

/// Specify whether the optimizer should skip rules that produce errors, or fail the query
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
self.skip_failing_rules = b;
self
}

pub fn next_id(&mut self) -> usize {
self.next_id += 1;
self.next_id
Expand Down Expand Up @@ -97,13 +106,88 @@ impl Optimizer {
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
new_plan = rule.optimize(&new_plan, optimizer_config)?;
observer(&new_plan, rule.as_ref());
debug!("After apply {} rule:\n", rule.name());
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
let result = rule.optimize(&new_plan, optimizer_config);
match result {
Ok(plan) => {
new_plan = plan;
observer(&new_plan, rule.as_ref());
debug!("After apply {} rule:\n", rule.name());
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
}
Err(ref e) => {
if optimizer_config.skip_failing_rules {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change here - we now have the option to ignore optimization rules that fail

// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/arrow-datafusion
warn!(
"Skipping optimizer rule {} due to unexpected error: {}",
rule.name(),
e
);
} else {
return result;
}
}
}
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
}
}

#[cfg(test)]
mod tests {
use crate::optimizer::Optimizer;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::LogicalPlan;
use std::sync::Arc;

#[test]
fn skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(true);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
opt.optimize(&plan, &mut config, &observe)?;
Ok(())
}

#[test]
fn no_skip_failing_rule() -> Result<(), DataFusionError> {
let opt = Optimizer::new(vec![Arc::new(BadRule {})]);
let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let result = opt.optimize(&plan, &mut config, &observe);
assert_eq!(
"Error during planning: rule failed",
format!("{}", result.err().unwrap())
);
Ok(())
}

fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}

struct BadRule {}

impl OptimizerRule for BadRule {
fn optimize(
&self,
_plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Err(DataFusionError::Plan("rule failed".to_string()))
}

fn name(&self) -> &str {
"bad rule"
}
}
}
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. |
| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. |
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. |