From 24670e404545f9957eed51e66c42d59b0337f4be Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:09:27 +0100 Subject: [PATCH] Enable predicate pushdown in read_parquet --- dask_expr/io/parquet.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 8cd9e8c3d..366b2bbf7 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -39,12 +39,15 @@ And, Blockwise, Expr, + Filter, Index, Lengths, Literal, Or, Projection, + are_co_aligned, determine_column_projection, + is_filter_pushdown_available, ) from dask_expr._reductions import Len from dask_expr._util import _convert_to_list, _tokenize_deterministic @@ -452,6 +455,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): } _pq_length_stats = None _absorb_projections = True + _filter_passthrough = True def _tree_repr_argument_construction(self, i, op, header): if self._parameters[i] == "_dataset_info_cache": @@ -486,6 +490,22 @@ def _simplify_up(self, parent, dependents): if isinstance(parent, Projection): return super()._simplify_up(parent, dependents) + if ( + isinstance(parent, Filter) + and isinstance(parent.predicate, (LE, GE, LT, GT, EQ, NE, And, Or)) + and is_filter_pushdown_available(self, parent, dependents) + ): + # Predicate pushdown + filters = _DNF.extract_pq_filters(self, parent.predicate) + if filters._filters is not None: + return self.substitute_parameters( + { + "filters": filters.combine( + self.operand("filters") + ).to_list_tuple() + } + ) + if isinstance(parent, Lengths): _lengths = self._get_lengths() if _lengths: @@ -888,19 +908,15 @@ def combine(self, other: _DNF | _And | _Or | list | tuple | None) -> _DNF: def extract_pq_filters(cls, pq_expr: ReadParquet, predicate_expr: Expr) -> _DNF: _filters = None if isinstance(predicate_expr, (LE, GE, LT, GT, EQ, NE)): - if ( - isinstance(predicate_expr.left, ReadParquet) - and predicate_expr.left.path == pq_expr.path - and not isinstance(predicate_expr.right, Expr) + if are_co_aligned(pq_expr, predicate_expr.left) and not isinstance( + predicate_expr.right, Expr ): op = predicate_expr._operator_repr column = predicate_expr.left.columns[0] value = predicate_expr.right _filters = (column, op, value) - elif ( - isinstance(predicate_expr.right, ReadParquet) - and predicate_expr.right.path == pq_expr.path - and not isinstance(predicate_expr.left, Expr) + elif are_co_aligned(pq_expr, predicate_expr.right) and not isinstance( + predicate_expr.left, Expr ): # Simple dict to make sure field comes first in filter flip = {LE: GE, LT: GT, GE: LE, GT: LT}