-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
So I noticed that when executing the query SELECT DISTINCT my_column FROM dataset LIMIT 10 (trough the Python connector), it was taking lots of time and consuming most of my RAM.
I then looked at the query plan, and it seems like its actually doing GROUP BY my_column which causes a full-scan, what makes it even worse, is that all 10 values returned are present in the first parquet file in the dataset (pyarrow.Dataset.files[0]), so it could've just stopped scanning after the first file immediately.
To Reproduce
import datafusion
import time
start = time.perf_counter()
ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)
out = ctx.sql("SELECT DISTINCT my_column FROM dataset LIMIT 10")
print('datafusion', time.perf_counter() - start)
print(repr(out.logical_plan()))
print('-' * 10)
print(repr(out.optimized_logical_plan()))
print('-' * 10)
print(repr(out.execution_plan()))
print('-' * 10)
print(repr(out.explain()))
print('-' * 10)datafusion 0.0003965760000141927
Limit: skip=0, fetch=10
Distinct:
Projection: dataset.canonical_url
TableScan: dataset
----------
Limit: skip=0, fetch=10
Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]]
TableScan: dataset projection=[canonical_url]
----------
GlobalLimitExec: skip=0, fetch=10
CoalescePartitionsExec
LocalLimitExec: fetch=10
AggregateExec: mode=FinalPartitioned, gby=[canonical_url@0 as canonical_url], aggr=[]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([canonical_url@0], 8), input_partitions=568
AggregateExec: mode=Partial, gby=[canonical_url@0 as canonical_url], aggr=[]
DatasetExec: number_of_fragments=568, projection=[canonical_url]
----------
DataFrame()
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]] |
| | TableScan: dataset projection=[canonical_url] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | CoalescePartitionsExec |
| | LocalLimitExec: fetch=10 |
| | AggregateExec: mode=FinalPartitioned, gby=[canonical_url@0 as canonical_url], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([canonical_url@0], 8), input_partitions=568 |
| | AggregateExec: mode=Partial, gby=[canonical_url@0 as canonical_url], aggr=[] |
| | DatasetExec: number_of_fragments=568, projection=[canonical_url] |
| | |
+---------------+---------------------------------------------------------------------------------------------+
None
----------
Expected behavior
It should stop scanning at the first 10 unique values found.
Additional context
For context, I'm querying a Parquet dataset, and the files are stored locally:
import pyarrow.dataset as ds
dataset = ds.dataset(...)Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working