Skip to content

Conversation

@wankunde
Copy link
Contributor

@wankunde wankunde commented Feb 7, 2023

What changes were proposed in this pull request?

This PR improves join stats estimation if one side can keep uniqueness(The distinct keys of the children of the join are a subset of the join keys). A common case is:

SELECT i_item_sk ss_item_sk
FROM   item,
       (SELECT DISTINCT iss.i_brand_id    brand_id,
                        iss.i_class_id    class_id,
                        iss.i_category_id category_id
        FROM   item iss) x
WHERE  i_brand_id = brand_id
       AND i_class_id = class_id
       AND i_category_id = category_id 

In this case, the row count of the join will definitely not expand.

Before this PR:

== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=370.8 MiB, rowCount=3.24E+7)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=1112.3 MiB, rowCount=3.24E+7)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)

After this PR:

== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=2.3 MiB, rowCount=2.02E+5)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=7.0 MiB, rowCount=2.02E+5)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)

Why are the changes needed?

Plan more broadcast joins to improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and TPC-DS benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q14a 187  164

@github-actions github-actions bot added the SQL label Feb 7, 2023
@wangyum
Copy link
Member

wangyum commented Feb 16, 2023

cc @cloud-fan

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @wankunde and @wangyum .
Merged to master/3.4.

dongjoon-hyun pushed a commit that referenced this pull request Feb 17, 2023
… uniqueness

### What changes were proposed in this pull request?
This PR improves join stats estimation if one side can keep uniqueness(The distinct keys of the children of the join are a subset of the join keys). A common case is:

```sql
SELECT i_item_sk ss_item_sk
FROM   item,
       (SELECT DISTINCT iss.i_brand_id    brand_id,
                        iss.i_class_id    class_id,
                        iss.i_category_id category_id
        FROM   item iss) x
WHERE  i_brand_id = brand_id
       AND i_class_id = class_id
       AND i_category_id = category_id
```

In this case, the row count of the join will definitely not expand.

Before this PR:

```
== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=370.8 MiB, rowCount=3.24E+7)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=1112.3 MiB, rowCount=3.24E+7)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
```

After this PR:

```
== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=2.3 MiB, rowCount=2.02E+5)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=7.0 MiB, rowCount=2.02E+5)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
```

### Why are the changes needed?
Plan more broadcast joins to improve query performance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test and TPC-DS benchmark test.

SQL	Before this PR(Seconds)	After this PR(Seconds)
q14a	187 	164

Closes #39923 from wankunde/SPARK-39851.

Lead-authored-by: wankunde <wankunde@163.com>
Co-authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 1c0bd1f)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

I didn't set Assignee yet because of #37267 . I'll leave it to @wangyum .

@wangyum
Copy link
Member

wangyum commented Feb 17, 2023

Thank you @dongjoon-hyun.

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
… uniqueness

### What changes were proposed in this pull request?
This PR improves join stats estimation if one side can keep uniqueness(The distinct keys of the children of the join are a subset of the join keys). A common case is:

```sql
SELECT i_item_sk ss_item_sk
FROM   item,
       (SELECT DISTINCT iss.i_brand_id    brand_id,
                        iss.i_class_id    class_id,
                        iss.i_category_id category_id
        FROM   item iss) x
WHERE  i_brand_id = brand_id
       AND i_class_id = class_id
       AND i_category_id = category_id
```

In this case, the row count of the join will definitely not expand.

Before this PR:

```
== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=370.8 MiB, rowCount=3.24E+7)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=1112.3 MiB, rowCount=3.24E+7)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
```

After this PR:

```
== Optimized Logical Plan ==
Project [i_item_sk#4 AS ss_item_sk#54], Statistics(sizeInBytes=2.3 MiB, rowCount=2.02E+5)
+- Join Inner, (((i_brand_id#11 = brand_id#51) AND (i_class_id#13 = class_id#52)) AND (i_category_id#15 = category_id#53)), Statistics(sizeInBytes=7.0 MiB, rowCount=2.02E+5)
   :- Project [i_item_sk#4, i_brand_id#11, i_class_id#13, i_category_id#15], Statistics(sizeInBytes=4.6 MiB, rowCount=2.02E+5)
   :  +- Filter ((isnotnull(i_brand_id#11) AND isnotnull(i_class_id#13)) AND isnotnull(i_category_id#15)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
   :     +- Relation spark_catalog.default.item[i_item_sk#4,i_item_id#5,i_rec_start_date#6,i_rec_end_date#7,i_item_desc#8,i_current_price#9,i_wholesale_cost#10,i_brand_id#11,i_brand#12,i_class_id#13,i_class#14,i_category_id#15,i_category#16,i_manufact_id#17,i_manufact#18,i_size#19,i_formulation#20,i_color#21,i_units#22,i_container#23,i_manager_id#24,i_product_name#25] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
   +- Aggregate [brand_id#51, class_id#52, category_id#53], [brand_id#51, class_id#52, category_id#53], Statistics(sizeInBytes=2.6 MiB, rowCount=1.37E+5)
      +- Project [i_brand_id#62 AS brand_id#51, i_class_id#64 AS class_id#52, i_category_id#66 AS category_id#53], Statistics(sizeInBytes=3.9 MiB, rowCount=2.02E+5)
         +- Filter ((isnotnull(i_brand_id#62) AND isnotnull(i_class_id#64)) AND isnotnull(i_category_id#66)), Statistics(sizeInBytes=84.6 MiB, rowCount=2.02E+5)
            +- Relation spark_catalog.default.item[i_item_sk#55,i_item_id#56,i_rec_start_date#57,i_rec_end_date#58,i_item_desc#59,i_current_price#60,i_wholesale_cost#61,i_brand_id#62,i_brand#63,i_class_id#64,i_class#65,i_category_id#66,i_category#67,i_manufact_id#68,i_manufact#69,i_size#70,i_formulation#71,i_color#72,i_units#73,i_container#74,i_manager_id#75,i_product_name#76] parquet, Statistics(sizeInBytes=85.2 MiB, rowCount=2.04E+5)
```

### Why are the changes needed?
Plan more broadcast joins to improve query performance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test and TPC-DS benchmark test.

SQL	Before this PR(Seconds)	After this PR(Seconds)
q14a	187 	164

Closes apache#39923 from wankunde/SPARK-39851.

Lead-authored-by: wankunde <wankunde@163.com>
Co-authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 1c0bd1f)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants