diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index f732d6431e143..054c66bf5d65d 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -43,6 +43,10 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches"; pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str = "datafusion.execution.coalesce_target_batch_size"; +/// Configuration option "datafusion.optimizer.skip_failed_rules" +pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = + "datafusion.optimizer.skip_failed_rules"; + /// Definition of a configuration option pub struct ConfigDefinition { /// key used to identifier this configuration option @@ -156,11 +160,18 @@ impl BuiltInConfigs { format!("Target batch size when coalescing batches. Uses in conjunction with the \ configuration setting '{}'.", OPT_COALESCE_BATCHES), 4096, + ), + ConfigDefinition::new_bool( + OPT_OPTIMIZER_SKIP_FAILED_RULES, + "When set to true, the logical plan optimizer will produce warning \ + messages if any optimization rules produce errors and then proceed to the next \ + rule. When set to false, any rules that produce errors will cause the query to fail.", + true )], } } - /// Generate documentation that can be included int he user guide + /// Generate documentation that can be included in the user guide pub fn generate_config_markdown() -> String { use std::fmt::Write as _; let configs = Self::new(); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 42de53a888aa7..5cb45be2f0659 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -83,7 +83,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, - OPT_FILTER_NULL_JOIN_KEYS, + OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Explain; @@ -1371,7 +1371,11 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { - let mut optimizer_config = OptimizerConfig::new(); + let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules( + self.config + .config_options + .get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES), + ); optimizer_config.query_execution_start_time = self.execution_props.query_execution_start_time; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1fee75df6e6f7..9d76cf5e7e6e9 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -20,7 +20,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::Result; use datafusion_expr::logical_plan::LogicalPlan; -use log::{debug, trace}; +use log::{debug, trace, warn}; use std::sync::Arc; /// `OptimizerRule` transforms one ['LogicalPlan'] into another which @@ -45,6 +45,8 @@ pub struct OptimizerConfig { /// to use a literal value instead pub query_execution_start_time: DateTime, next_id: usize, + /// Option to skip rules that produce errors + skip_failing_rules: bool, } impl OptimizerConfig { @@ -53,9 +55,16 @@ impl OptimizerConfig { Self { query_execution_start_time: chrono::Utc::now(), next_id: 0, // useful for generating things like unique subquery aliases + skip_failing_rules: true, } } + /// Specify whether the optimizer should skip rules that produce errors, or fail the query + pub fn with_skip_failing_rules(mut self, b: bool) -> Self { + self.skip_failing_rules = b; + self + } + pub fn next_id(&mut self) -> usize { self.next_id += 1; self.next_id @@ -97,13 +106,88 @@ impl Optimizer { debug!("Input logical plan:\n{}\n", plan.display_indent()); trace!("Full input logical plan:\n{:?}", plan); for rule in &self.rules { - new_plan = rule.optimize(&new_plan, optimizer_config)?; - observer(&new_plan, rule.as_ref()); - debug!("After apply {} rule:\n", rule.name()); - debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + let result = rule.optimize(&new_plan, optimizer_config); + match result { + Ok(plan) => { + new_plan = plan; + observer(&new_plan, rule.as_ref()); + debug!("After apply {} rule:\n", rule.name()); + debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + } + Err(ref e) => { + if optimizer_config.skip_failing_rules { + // Note to future readers: if you see this warning it signals a + // bug in the DataFusion optimizer. Please consider filing a ticket + // https://github.com/apache/arrow-datafusion + warn!( + "Skipping optimizer rule {} due to unexpected error: {}", + rule.name(), + e + ); + } else { + return result; + } + } + } } debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); trace!("Full Optimized logical plan:\n {:?}", new_plan); Ok(new_plan) } } + +#[cfg(test)] +mod tests { + use crate::optimizer::Optimizer; + use crate::{OptimizerConfig, OptimizerRule}; + use datafusion_common::{DFSchema, DataFusionError}; + use datafusion_expr::logical_plan::EmptyRelation; + use datafusion_expr::LogicalPlan; + use std::sync::Arc; + + #[test] + fn skip_failing_rule() -> Result<(), DataFusionError> { + let opt = Optimizer::new(vec![Arc::new(BadRule {})]); + let mut config = OptimizerConfig::new().with_skip_failing_rules(true); + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + opt.optimize(&plan, &mut config, &observe)?; + Ok(()) + } + + #[test] + fn no_skip_failing_rule() -> Result<(), DataFusionError> { + let opt = Optimizer::new(vec![Arc::new(BadRule {})]); + let mut config = OptimizerConfig::new().with_skip_failing_rules(false); + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let result = opt.optimize(&plan, &mut config, &observe); + assert_eq!( + "Error during planning: rule failed", + format!("{}", result.err().unwrap()) + ); + Ok(()) + } + + fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {} + + struct BadRule {} + + impl OptimizerRule for BadRule { + fn optimize( + &self, + _plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> datafusion_common::Result { + Err(DataFusionError::Plan("rule failed".to_string())) + } + + fn name(&self) -> &str { + "bad rule" + } + } +} diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 41f6df85056ac..6794067bf82ba 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -43,3 +43,4 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | | datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | | datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. |