Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 52 additions & 6 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
.with_row_groups(row_group_indexes)
.build()?;

let adapted = stream
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
.map(move |maybe_batch| {
maybe_batch
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into))
});
// Create a stateful stream that can check pruning after each batch
let adapted = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this code somewhat 🤯 (and this function is already 100s of lines long) I spent some time refactoring it into its own stream for readability and I also understand it better now. I'll put up a follow on PR to extract this logic -- no need to do it in this one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use futures::stream;
let schema_mapping = Some(schema_mapping);
let file_pruner = file_pruner;
let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
let files_ranges_pruned_statistics =
file_metrics.files_ranges_pruned_statistics.clone();

stream::try_unfold(
(
stream,
schema_mapping,
file_pruner,
files_ranges_pruned_statistics,
),
move |(
mut stream,
schema_mapping_opt,
mut file_pruner,
files_ranges_pruned_statistics,
)| async move {
match stream.try_next().await? {
Some(batch) => {
let schema_mapping = schema_mapping_opt.as_ref().unwrap();
let mapped_batch = schema_mapping.map_batch(batch)?;

// Check if we can prune the file now
if let Some(ref mut pruner) = file_pruner {
if pruner.should_prune()? {
// File can now be pruned based on updated dynamic filters
files_ranges_pruned_statistics.add(1);
// Terminate the stream early
return Ok(None);
}
}

Ok(Some((
mapped_batch,
(
stream,
schema_mapping_opt,
file_pruner,
files_ranges_pruned_statistics,
),
)))
}
None => Ok(None),
}
},
)
};

Ok(adapted.boxed())
}))
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true, features = ["use_std"] }
log = { workspace = true }
parking_lot = { workspace = true }
paste = "^1.0"
petgraph = "0.8.2"

Expand Down
61 changes: 20 additions & 41 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::{
any::Any,
fmt::Display,
hash::Hash,
sync::{Arc, RwLock},
};
use parking_lot::RwLock;
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};

use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
Expand Down Expand Up @@ -72,6 +68,11 @@ impl Inner {
expr,
}
}

/// Clone the inner expression.
fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}
}

impl Hash for DynamicFilterPhysicalExpr {
Expand Down Expand Up @@ -176,20 +177,8 @@ impl DynamicFilterPhysicalExpr {
/// This will return the current expression with any children
/// remapped to match calls to [`PhysicalExpr::with_new_children`].
pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
let inner = Arc::clone(
&self
.inner
.read()
.map_err(|_| {
datafusion_common::DataFusionError::Execution(
"Failed to acquire read lock for inner".to_string(),
)
})?
.expr,
);
let inner =
Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?;
Ok(inner)
let expr = Arc::clone(self.inner.read().expr());
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
}

/// Update the current expression.
Expand All @@ -199,11 +188,6 @@ impl DynamicFilterPhysicalExpr {
/// - When we've computed the probe side's hash table in a HashJoinExec
/// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach.
pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
let mut current = self.inner.write().map_err(|_| {
datafusion_common::DataFusionError::Execution(
"Failed to acquire write lock for inner".to_string(),
)
})?;
// Remap the children of the new expression to match the original children
// We still do this again in `current()` but doing it preventively here
// reduces the work needed in some cases if `current()` is called multiple times
Expand All @@ -213,10 +197,13 @@ impl DynamicFilterPhysicalExpr {
self.remapped_children.as_ref(),
new_expr,
)?;
// Update the inner expression to the new expression.
current.expr = new_expr;
// Increment the generation to indicate that the expression has changed.
current.generation += 1;

// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
*current = Inner {
generation: current.generation + 1,
expr: new_expr,
};
Ok(())
}
}
Expand Down Expand Up @@ -253,10 +240,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
{
use datafusion_common::internal_err;
// Check if the data type has changed.
let mut data_type_lock = self
.data_type
.write()
.expect("Failed to acquire write lock for data_type");
let mut data_type_lock = self.data_type.write();

if let Some(existing) = &*data_type_lock {
if existing != &res {
// If the data type has changed, we have a bug.
Expand All @@ -278,10 +263,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
{
use datafusion_common::internal_err;
// Check if the nullability has changed.
let mut nullable_lock = self
.nullable
.write()
.expect("Failed to acquire write lock for nullable");
let mut nullable_lock = self.nullable.write();
if let Some(existing) = *nullable_lock {
if existing != res {
// If the nullability has changed, we have a bug.
Expand Down Expand Up @@ -324,10 +306,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {

fn snapshot_generation(&self) -> u64 {
// Return the current generation of the expression.
self.inner
.read()
.expect("Failed to acquire read lock for inner")
.generation
self.inner.read().generation
}
}

Expand Down
33 changes: 22 additions & 11 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use parking_lot::RwLock;

use crate::common::spawn_buffered;
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
Expand All @@ -39,8 +41,10 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::spill::get_record_batch_memory_size;
use crate::spill::in_progress_spill_file::InProgressSpillFile;
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
use crate::stream::{BatchSplitStream, RecordBatchStreamAdapter};
use crate::stream::BatchSplitStream;
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::topk::TopKDynamicFilters;
use crate::{
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
Expand All @@ -51,7 +55,10 @@ use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
use datafusion_common::{
internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -887,8 +894,10 @@ pub struct SortExec {
common_sort_prefix: Vec<PhysicalSortExpr>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Filter matching the state of the sort for dynamic filter pushdown
filter: Option<Arc<DynamicFilterPhysicalExpr>>,
/// Filter matching the state of the sort for dynamic filter pushdown.
/// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
/// If `fetch` is `None`, this will be `None`.
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
}

impl SortExec {
Expand Down Expand Up @@ -934,14 +943,16 @@ impl SortExec {
self
}

/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
/// Add or reset `self.filter` to a new `TopKDynamicFilters`.
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(children, lit(true)),
))))
}

fn cloned(&self) -> Self {
Expand Down Expand Up @@ -1080,7 +1091,7 @@ impl DisplayAs for SortExec {
Some(fetch) => {
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
if let Some(filter) = &self.filter {
if let Ok(current) = filter.current() {
if let Ok(current) = filter.read().expr().current() {
if !current.eq(&lit(true)) {
write!(f, ", filter=[{current}]")?;
}
Expand Down Expand Up @@ -1216,6 +1227,7 @@ impl ExecutionPlan for SortExec {
))),
(true, None) => Ok(input),
(false, Some(fetch)) => {
let filter = self.filter.clone();
let mut topk = TopK::try_new(
partition,
input.schema(),
Expand All @@ -1225,7 +1237,7 @@ impl ExecutionPlan for SortExec {
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
self.filter.clone(),
Arc::clone(&unwrap_or_internal_err!(filter)),
)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand Down Expand Up @@ -1349,8 +1361,7 @@ impl ExecutionPlan for SortExec {

if let Some(filter) = &self.filter {
if config.optimizer.enable_dynamic_filter_pushdown {
child =
child.with_self_filter(Arc::clone(filter) as Arc<dyn PhysicalExpr>);
child = child.with_self_filter(filter.read().expr());
}
}

Expand Down
Loading