Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions datafusion/core/tests/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ async fn count_wildcard() -> Result<()> {
let sql_results = ctx
.sql("select count(*) from alltypes_tiny_pages")
.await?
.select(vec![count(Expr::Wildcard)])?
.explain(false, false)?
.collect()
.await?;

// add `.select(vec![count(Expr::Wildcard)])?` to make sure we can analyze all node instead of just top node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let df_results = ctx
.table("alltypes_tiny_pages")
.await?
.aggregate(vec![], vec![count(Expr::Wildcard)])?
.explain(false, false)
.unwrap()
.select(vec![count(Expr::Wildcard)])?
.explain(false, false)?
.collect()
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{aggregate_function, lit, Aggregate, Expr, LogicalPlan, Window};
use std::ops::Deref;
use std::sync::Arc;

use crate::analyzer::AnalyzerRule;
use crate::rewrite::TreeNodeRewritable;

/// Rewrite `Count(Expr:Wildcard)` to `Count(Expr:Literal)`.
/// Resolve issue: https://github.com/apache/arrow-datafusion/issues/5473.
pub struct CountWildcardRule {}

impl Default for CountWildcardRule {
Expand All @@ -39,45 +41,42 @@ impl CountWildcardRule {
}
impl AnalyzerRule for CountWildcardRule {
fn analyze(&self, plan: &LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
let new_plan = match plan {
LogicalPlan::Window(window) => {
let inputs = plan.inputs();
let window_expr = window.clone().window_expr;
let window_expr = handle_wildcard(window_expr).unwrap();
LogicalPlan::Window(Window {
input: Arc::new(inputs.get(0).unwrap().deref().clone()),
window_expr,
schema: plan.schema().clone(),
})
}

LogicalPlan::Aggregate(aggregate) => {
let inputs = plan.inputs();
let aggr_expr = aggregate.clone().aggr_expr;
let aggr_expr = handle_wildcard(aggr_expr).unwrap();
LogicalPlan::Aggregate(
Aggregate::try_new_with_schema(
Arc::new(inputs.get(0).unwrap().deref().clone()),
aggregate.clone().group_expr,
aggr_expr,
plan.schema().clone(),
)
.unwrap(),
)
}
_ => plan.clone(),
};
Ok(new_plan)
plan.clone().transform_down(&analyze_internal)
}

fn name(&self) -> &str {
"count_wildcard_rule"
}
}

//handle Count(Expr:Wildcard) with DataFrame API
pub fn handle_wildcard(exprs: Vec<Expr>) -> Result<Vec<Expr>> {
let exprs: Vec<Expr> = exprs
fn analyze_internal(plan: LogicalPlan) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Window(window) => {
let window_expr = handle_wildcard(&window.window_expr);
Ok(Some(LogicalPlan::Window(Window {
input: window.input.clone(),
window_expr,
schema: window.schema,
})))
}
LogicalPlan::Aggregate(agg) => {
let aggr_expr = handle_wildcard(&agg.aggr_expr);
Ok(Some(LogicalPlan::Aggregate(
Aggregate::try_new_with_schema(
agg.input.clone(),
agg.group_expr.clone(),
aggr_expr,
agg.schema,
)?,
)))
}
_ => Ok(None),
}
}

// handle Count(Expr:Wildcard) with DataFrame API
pub fn handle_wildcard(exprs: &[Expr]) -> Vec<Expr> {
exprs
.iter()
.map(|expr| match expr {
Expr::AggregateFunction(AggregateFunction {
Expand All @@ -96,6 +95,5 @@ pub fn handle_wildcard(exprs: Vec<Expr>) -> Result<Vec<Expr>> {
},
_ => expr.clone(),
})
.collect();
Ok(exprs)
.collect()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use crate::count_wildcard_rule::CountWildcardRule;
mod count_wildcard_rule;

use crate::analyzer::count_wildcard_rule::CountWildcardRule;
use crate::rewrite::TreeNodeRewritable;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_expr::{

use crate::{OptimizerConfig, OptimizerRule};

/// Optimization rule that elimanate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
/// Optimization rule that eliminate the scalar value (true/false) filter with an [LogicalPlan::EmptyRelation]
#[derive(Default)]
pub struct EliminateFilter;

Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub mod type_coercion;
pub mod unwrap_cast_in_comparison;
pub mod utils;

pub mod count_wildcard_rule;
#[cfg(test)]
pub mod test;

Expand Down