Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,12 @@ impl ExecutionPlan for AggregateExec {
..Default::default()
}
}
_ => Statistics::default(),
_ => Statistics {
// the output row count is surely not larger than its input row count
num_rows: self.input.statistics().num_rows,
is_exact: false,
..Default::default()
},
}
}
}
Expand Down
81 changes: 73 additions & 8 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ impl ExecutionPlan for GlobalLimitExec {
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
let skip = self.skip;
// the maximum row number needs to be fetched
let max_row_num = self
.fetch
.map(|fetch| {
if fetch >= usize::MAX - skip {
usize::MAX
} else {
fetch + skip
}
})
.unwrap_or(usize::MAX);
match input_stats {
Statistics {
num_rows: Some(nr), ..
Expand All @@ -200,22 +211,25 @@ impl ExecutionPlan for GlobalLimitExec {
is_exact: input_stats.is_exact,
..Default::default()
}
} else if nr - skip <= self.fetch.unwrap_or(usize::MAX) {
} else if nr <= max_row_num {
// if the input does not reach the "fetch" globally, return input stats
input_stats
} else if nr - skip > self.fetch.unwrap_or(usize::MAX) {
} else {
// if the input is greater than the "fetch", the num_row will be the "fetch",
// but we won't be able to predict the other statistics
Statistics {
num_rows: self.fetch,
num_rows: Some(max_row_num),
is_exact: input_stats.is_exact,
..Default::default()
}
} else {
Statistics::default()
}
}
_ => Statistics::default(),
_ => Statistics {
// the result output row number will always be no greater than the limit number
num_rows: Some(max_row_num),
is_exact: false,
..Default::default()
},
}
}
}
Expand Down Expand Up @@ -353,8 +367,12 @@ impl ExecutionPlan for LocalLimitExec {
is_exact: input_stats.is_exact,
..Default::default()
},
// if we don't know the input size, we can't predict the limit's behaviour
_ => Statistics::default(),
_ => Statistics {
// the result output row number will always be no greater than the limit number
num_rows: Some(self.fetch * self.output_partitioning().partition_count()),
is_exact: false,
..Default::default()
},
}
}
}
Expand Down Expand Up @@ -623,4 +641,51 @@ mod tests {
assert_eq!(row_count, 0);
Ok(())
}

#[tokio::test]
async fn test_row_number_statistics_for_global_limit() -> Result<()> {
let row_count = row_number_statistics_for_global_limit(0, Some(10)).await?;
assert_eq!(row_count, Some(10));

let row_count = row_number_statistics_for_global_limit(5, Some(10)).await?;
assert_eq!(row_count, Some(15));

Ok(())
}

#[tokio::test]
async fn test_row_number_statistics_for_local_limit() -> Result<()> {
let row_count = row_number_statistics_for_local_limit(4, 10).await?;
assert_eq!(row_count, Some(40));

Ok(())
}

async fn row_number_statistics_for_global_limit(
skip: usize,
fetch: Option<usize>,
) -> Result<Option<usize>> {
let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;

assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let offset =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), skip, fetch);

Ok(offset.statistics().num_rows)
}

async fn row_number_statistics_for_local_limit(
num_partitions: usize,
fetch: usize,
) -> Result<Option<usize>> {
let csv = test::scan_partitioned_csv(num_partitions)?;

assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let offset = LocalLimitExec::new(csv, fetch);

Ok(offset.statistics().num_rows)
}
}
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,9 @@ impl OptimizerRule for PushDownFilter {
)
.map(|e| (*e).clone())
.collect::<Vec<_>>();
let new_predicate = conjunction(new_predicates).ok_or(
DataFusionError::Plan("at least one expression exists".to_string()),
)?;
let new_predicate = conjunction(new_predicates).ok_or_else(|| {
DataFusionError::Plan("at least one expression exists".to_string())
})?;
let new_plan = LogicalPlan::Filter(Filter::try_new(
new_predicate,
child_filter.input.clone(),
Expand Down