-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When running the benchmarks with DataFusion I noticed that we scan statistics for all tables early on (even tables not referenced in the query). This happens in ExecutionContext::register_table. We then scan statistics again later on for the tables that are actually used in the query.
../target/release/tpch benchmark datafusion --path /mnt/bigdata/tpch-sf1000-parquet/ --format parquet --iterations 1 --debug --concurrency 24 --query 3
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 3, debug: true, iterations: 1, concurrency: 24, batch_size: 8192, path: "/mnt/bigdata/tpch-sf1000-parquet/", file_format: "parquet", mem_table: false, partitions: 8 }
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//part)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//supplier)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//partsupp)
Scanned 48 Parquet files for statistics in 1 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//nation)
Scanned 1 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//region)
Scanned 1 Parquet files for statistics in 0 seconds
=== Logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
Filter: #customer.c_mktsegment Eq Utf8("BUILDING") And #orders.o_orderdate Lt CAST(Utf8("1995-03-15") AS Date32) And #lineitem.l_shipdate Gt CAST(Utf8("1995-03-15") AS Date32)
Join: #orders.o_orderkey = #lineitem.l_orderkey
Join: #customer.c_custkey = #orders.o_custkey
TableScan: customer projection=None
TableScan: orders projection=None
TableScan: lineitem projection=None
=== Optimized logical plan ===
Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
Join: #orders.o_orderkey = #lineitem.l_orderkey
Join: #customer.c_custkey = #orders.o_custkey
Filter: #customer.c_mktsegment Eq Utf8("BUILDING")
TableScan: customer projection=Some([0, 6]), filters=[#customer.c_mktsegment Eq Utf8("BUILDING")]
Filter: #orders.o_orderdate Lt Date32("9204")
TableScan: orders projection=Some([0, 1, 4, 7]), filters=[#orders.o_orderdate Lt Date32("9204")]
Filter: #lineitem.l_shipdate Gt Date32("9204")
TableScan: lineitem projection=Some([0, 5, 6, 10]), filters=[#lineitem.l_shipdate Gt Date32("9204")]
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
Scanned 48 Parquet files for statistics in 0 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
Scanned 48 Parquet files for statistics in 4 seconds
ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
Scanned 48 Parquet files for statistics in 30 seconds
=== Physical plan ===
SortExec: [revenue@1 DESC,o_orderdate@2 ASC]
CoalescePartitionsExec
ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
HashAggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 24)
HashAggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 24)
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 24)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: c_mktsegment@1 = BUILDING
ParquetExec: batch_size=8192, limit=None, partitions=[...]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 24)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: o_orderdate@2 < 9204
ParquetExec: batch_size=8192, limit=None, partitions=[...]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 24)
CoalesceBatchesExec: target_batch_size=4096
FilterExec: l_shipdate@3 > 9204
ParquetExec: batch_size=8192, limit=None, partitions=[...]
Describe the solution you'd like
- We should only scan statistics for tables that are used in the query
- We should only scan statistics once
Describe alternatives you've considered
N/A
Additional context
N/A
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster