From 3fd94b0597fe5468699e1f7edba16d4b2492ce71 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Fri, 21 Jul 2023 21:57:33 +0800 Subject: [PATCH] Enable Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious). This is a parallel-oblivious parallel hash join, that each inner side table would be duplicately processed without a shared hash table. We could benefit from parallel if the outer table is large and inner table is relatively small. See [1] below for example DDL and DML. Non-parallel plan: explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2); QUERY PLAN ------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (actual time=1808.872..1808.875 rows=1 loops=1) -> Gather Motion 3:1 (slice1; segments: 3) (actual time=1745.235..1808.858 rows=3 loops=1) -> Partial Aggregate (actual time=1808.622..1808.625 rows=1 loops=1) -> Hash Left Anti Semi (Not-In) Join (actual time=2.890..1583.005 rows=1667434 loops=1) Hash Cond: (t1.c1 = t2.c1) Extra Text: (seg2) Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets. -> Seq Scan on t1 (actual time=0.355..678.531 rows=1667832 loops=1) -> Hash (actual time=2.068..2.069 rows=1200 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 4139kB -> Broadcast Motion 3:3 (slice2; segments: 3) (actual time=1.476..1.772 rows=1200 loops=1) -> Seq Scan on t2 (actual time=0.356..0.499 rows=407 loops=1) Planning Time: 0.454 ms (slice0) Executor memory: 124K bytes. (slice1) Executor memory: 4443K bytes avg x 3x(0) workers, 4443K bytes max (seg0). Work_mem: 4139K bytes max. (slice2) Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0). Memory used: 128000kB Optimizer: Postgres query optimizer Execution Time: 1809.517 ms (18 rows) Time: 1810.827 ms (00:01.811) Parallel plan: explain(analyze, costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2); QUERY PLAN ------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (actual time=758.707..758.710 rows=1 loops=1) -> Gather Motion 6:1 (slice1; segments: 6) (actual time=668.747..758.685 rows=6 loops=1) -> Partial Aggregate (actual time=752.479..752.483 rows=1 loops=1) -> Hash Left Anti Semi (Not-In) Join (actual time=3.010..565.127 rows=833732 loops=1) Hash Cond: (t1.c1 = t2.c1) Extra Text: (seg2) Hash chain length 1.0 avg, 2 max, using 1199 of 524288 buckets. -> Parallel Seq Scan on t1 (actual time=0.368..231.049 rows=833932 loops=1) -> Hash (actual time=2.148..2.149 rows=1200 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 4139kB -> Broadcast Motion 3:6 (slice2; segments: 3) (actual time=0.203..1.779 rows=1200 loops=1) -> Seq Scan on t2 (actual time=0.361..0.499 rows=407 loops=1) Planning Time: 0.470 ms (slice0) Executor memory: 124K bytes. (slice1) Executor memory: 4483K bytes avg x 6x(0) workers, 4483K bytes max (seg0). Work_mem: 4139K bytes max. (slice2) Executor memory: 262K bytes avg x 3x(0) workers, 262K bytes max (seg0). Memory used: 128000kB Optimizer: Postgres query optimizer Execution Time: 759.440 ms (18 rows) Time: 760.874 ms [1] Example: create table t1(c1 int, c2 int) using ao_row distributed by (c1); create table t2(c1 int, c2 int) using ao_row distributed by (c1); set enable_parallel = on; set gp_appendonly_insert_files = 2; set gp_appendonly_insert_files_tuples_range = 100; set max_parallel_workers_per_gather = 2; insert into t1 select i, i from generate_series(1, 5000000) i; insert into t2 select i+1, i from generate_series(1, 1200) i; analyze t1; analyze t2; Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/optimizer/path/joinpath.c | 2 +- src/test/regress/expected/gp_parallel.out | 75 +++++++++++++++++++++++ src/test/regress/sql/gp_parallel.sql | 33 ++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index c762ec61fbe..c12317b2a89 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2299,7 +2299,6 @@ hash_inner_and_outer(PlannerInfo *root, save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT && - save_jointype != JOIN_LASJ_NOTIN && save_jointype != JOIN_DEDUP_SEMI && save_jointype != JOIN_DEDUP_SEMI_REVERSE && outerrel->partial_pathlist != NIL && @@ -2319,6 +2318,7 @@ hash_inner_and_outer(PlannerInfo *root, */ if (innerrel->partial_pathlist != NIL && save_jointype != JOIN_UNIQUE_INNER && + save_jointype != JOIN_LASJ_NOTIN && enable_parallel_hash) { cheapest_partial_inner = diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 21031b2accc..23bba396d3d 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -1590,6 +1590,81 @@ select * from t1 order by c2 asc limit 3 offset 5; abort; -- +-- Test Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious). +-- +create table t1(c1 int, c2 int) using ao_row distributed by (c1); +create table t2(c1 int, c2 int) using ao_row distributed by (c1); +create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); +set enable_parallel = on; +set gp_appendonly_insert_files = 2; +set gp_appendonly_insert_files_tuples_range = 100; +set max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 5000000) i; +insert into t2 select i+1, i from generate_series(1, 1200) i; +insert into t3_null select i+1, i from generate_series(1, 1200) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + -> Gather Motion 6:1 (slice1; segments: 6) + -> Partial Aggregate + -> Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t2.c1) + -> Parallel Seq Scan on t1 + -> Hash + -> Broadcast Motion 3:6 (slice2; segments: 3) + -> Seq Scan on t2 + Optimizer: Postgres query optimizer +(10 rows) + +select sum(t1.c1) from t1 where c1 not in (select c1 from t2); + sum +---------------- + 12500001778200 +(1 row) + +explain(costs off) select * from t1 where c1 not in (select c1 from t3_null); + QUERY PLAN +--------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + -> Hash Left Anti Semi (Not-In) Join + Hash Cond: (t1.c1 = t3_null.c1) + -> Parallel Seq Scan on t1 + -> Hash + -> Broadcast Motion 3:6 (slice2; segments: 3) + -> Seq Scan on t3_null + Optimizer: Postgres query optimizer +(8 rows) + +select * from t1 where c1 not in (select c1 from t3_null); + c1 | c2 +----+---- +(0 rows) + +-- non-parallel results. +set enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c1 from t2); + sum +---------------- + 12500001778200 +(1 row) + +select * from t1 where c1 not in (select c1 from t3_null); + c1 | c2 +----+---- +(0 rows) + +drop table t1; +drop table t2; +drop table t3_null; +-- +-- End of Test Parallel Hash Left Anti Semi (Not-In) Join. +-- +-- -- Test alter ao/aocs table parallel_workers options -- begin; diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index e6fd9657e86..6feaeaa4117 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -465,6 +465,39 @@ set local enable_parallel = off; explain(costs off, locus) select * from t1 order by c2 asc limit 3 offset 5; select * from t1 order by c2 asc limit 3 offset 5; abort; + +-- +-- Test Parallel Hash Left Anti Semi (Not-In) Join(parallel-oblivious). +-- +create table t1(c1 int, c2 int) using ao_row distributed by (c1); +create table t2(c1 int, c2 int) using ao_row distributed by (c1); +create table t3_null(c1 int, c2 int) using ao_row distributed by (c1); +set enable_parallel = on; +set gp_appendonly_insert_files = 2; +set gp_appendonly_insert_files_tuples_range = 100; +set max_parallel_workers_per_gather = 2; +insert into t1 select i, i from generate_series(1, 5000000) i; +insert into t2 select i+1, i from generate_series(1, 1200) i; +insert into t3_null select i+1, i from generate_series(1, 1200) i; +insert into t3_null values(NULL, NULL); +analyze t1; +analyze t2; +analyze t3_null; +explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c1 from t2); +select sum(t1.c1) from t1 where c1 not in (select c1 from t2); +explain(costs off) select * from t1 where c1 not in (select c1 from t3_null); +select * from t1 where c1 not in (select c1 from t3_null); +-- non-parallel results. +set enable_parallel = off; +select sum(t1.c1) from t1 where c1 not in (select c1 from t2); +select * from t1 where c1 not in (select c1 from t3_null); +drop table t1; +drop table t2; +drop table t3_null; +-- +-- End of Test Parallel Hash Left Anti Semi (Not-In) Join. +-- + -- -- Test alter ao/aocs table parallel_workers options --