diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 9db0db827d5..4832f25d1ac 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -1826,7 +1826,6 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, Assert(parallel_workers > 0); #endif - /* GPDB parallel, parallel_workers <= 1 is bogus */ if (parallel_workers > 1) { /* Generate a partial append path. */ diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index ce315e3058e..243585fc308 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -79,7 +79,8 @@ static List *plan_union_children(PlannerInfo *root, List *refnames_tlist, List **tlist_list); static Path *make_union_unique(SetOperationStmt *op, Path *path, List *tlist, - PlannerInfo *root); + PlannerInfo *root, + Relids relids); static void postprocess_setop_rel(PlannerInfo *root, RelOptInfo *rel); static bool choose_hashed_setop(PlannerInfo *root, List *groupClauses, Path *input_path, @@ -573,7 +574,7 @@ generate_recursion_path(SetOperationStmt *setOp, PlannerInfo *root, if (!setOp->all && CdbPathLocus_IsPartitioned(path->locus)) { path = make_motion_hash_all_targets(root, path, tlist); - path = make_union_unique(setOp, path, tlist, root); + path = make_union_unique(setOp, path, tlist, root, bms_union(lrel->relids, rrel->relids)); } add_path(result_rel, path, root); @@ -595,7 +596,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, ListCell *lc; List *pathlist = NIL; List *partial_pathlist = NIL; - bool partial_paths_valid = false; /* CBDB_PARALLEL_FIXME: temproary disable partial path */ + bool partial_paths_valid = true; bool consider_parallel = true; List *rellist; List *tlist_list; @@ -680,7 +681,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, /* CDB: Hash motion to collocate non-distinct tuples. */ path = make_motion_hash_all_targets(root, path, tlist); } - path = make_union_unique(op, path, tlist, root); + path = make_union_unique(op, path, tlist, root, relids); } add_path(result_rel, path, root); @@ -730,7 +731,7 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, ppath = (Path *) create_append_path(root, result_rel, NIL, partial_pathlist, NIL, NULL, - parallel_workers, enable_parallel_append, + parallel_workers, false /* enable_parallel_append */, -1); /* CBDB_PARALLEL_FIXME: we disable pg styple Gather/GatherMerge node */ #if 0 @@ -739,8 +740,15 @@ generate_union_paths(SetOperationStmt *op, PlannerInfo *root, result_rel->reltarget, NULL, NULL); #endif if (!op->all) - ppath = make_union_unique(op, ppath, tlist, root); - add_path(result_rel, ppath, root); + { + /* CDB: Hash motion to collocate non-distinct tuples. */ + if (CdbPathLocus_IsPartitioned(ppath->locus)) + { + ppath = make_motion_hash_all_targets(root, ppath, tlist); + } + ppath = make_union_unique(op, ppath, tlist, root, relids); + } + add_partial_path(result_rel, ppath); } /* Undo effects of possibly forcing tuple_fraction to 0 */ @@ -1010,9 +1018,9 @@ plan_union_children(PlannerInfo *root, */ static Path * make_union_unique(SetOperationStmt *op, Path *path, List *tlist, - PlannerInfo *root) + PlannerInfo *root, Relids relids) { - RelOptInfo *result_rel = fetch_upper_rel(root, UPPERREL_SETOP, NULL); + RelOptInfo *result_rel = fetch_upper_rel(root, UPPERREL_SETOP, relids); List *groupList; double dNumGroups; diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 6ed0b1b6bb0..d6b8983eb2f 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -2006,28 +2006,32 @@ explain (locus, costs off) select * from rt1 union all select * from t2; Optimizer: Postgres query optimizer (15 rows) --- SingleQE as subquery seems cannot produce partial_pathlist and don't have chance to parallel append. +-- partial subpath under UNION ALL explain (locus, costs off) select a from rt1 union all select count(*) as a from sq1; QUERY PLAN ------------------------------------------------------ Append Locus: Entry - -> Gather Motion 1:1 (slice1; segments: 1) + -> Gather Motion 3:1 (slice1; segments: 3) Locus: Entry -> Subquery Scan on "*SELECT* 1" - Locus: SegmentGeneral - -> Seq Scan on rt1 - Locus: SegmentGeneral + Locus: SegmentGeneralWorkers + Parallel Workers: 3 + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 3 -> Finalize Aggregate Locus: Entry - -> Gather Motion 3:1 (slice2; segments: 3) + -> Gather Motion 6:1 (slice2; segments: 6) Locus: Entry -> Partial Aggregate - Locus: Hashed - -> Seq Scan on sq1 - Locus: Hashed + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on sq1 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer -(17 rows) +(21 rows) -- SegmentGeneralWorkers + General = SegmentGeneralWorkers explain (locus, costs off) select a from rt1 union all select a from generate_series(1, 1000) a; @@ -3264,6 +3268,153 @@ select distinct a from t_distinct_1; -- -- End of test Parallel DISTINCT -- +-- +-- Test Parallel UNION +-- +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; + QUERY PLAN +----------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> HashAggregate + Group Key: t_distinct_0.a + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: t_distinct_0.a + -> Append + -> HashAggregate + Group Key: t_distinct_0.a + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: t_distinct_0.a + -> HashAggregate + Group Key: t_distinct_0.a + -> Seq Scan on t_distinct_0 + -> HashAggregate + Group Key: t_distinct_0_1.b + -> Redistribute Motion 3:3 (slice4; segments: 3) + Hash Key: t_distinct_0_1.b + -> HashAggregate + Group Key: t_distinct_0_1.b + -> Seq Scan on t_distinct_0 t_distinct_0_1 + Optimizer: Postgres query optimizer +(21 rows) + +set enable_parallel = on; +set enable_groupagg = off; +set enable_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; + QUERY PLAN +-------------------------------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> Append + -> HashAggregate + Group Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice3; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: t_distinct_0.a + -> Parallel Seq Scan on t_distinct_0 + -> HashAggregate + Group Key: t_distinct_0_1.b + -> Redistribute Motion 6:6 (slice4; segments: 6) + Hash Key: t_distinct_0_1.b + Hash Module: 3 + -> Streaming HashAggregate + Group Key: t_distinct_0_1.b + -> Parallel Seq Scan on t_distinct_0 t_distinct_0_1 + Optimizer: Postgres query optimizer +(24 rows) + +reset enable_groupagg; +set enable_hashagg = off; +set enable_groupagg = on; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + -> Unique + Group Key: t_distinct_0.a + -> Sort + Sort Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> Append + -> GroupAggregate + Group Key: t_distinct_0.a + -> Sort + Sort Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice3; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> GroupAggregate + Group Key: t_distinct_0.a + -> Sort + Sort Key: t_distinct_0.a + -> Parallel Seq Scan on t_distinct_0 + -> GroupAggregate + Group Key: t_distinct_0_1.b + -> Sort + Sort Key: t_distinct_0_1.b + -> Redistribute Motion 6:6 (slice4; segments: 6) + Hash Key: t_distinct_0_1.b + Hash Module: 3 + -> GroupAggregate + Group Key: t_distinct_0_1.b + -> Sort + Sort Key: t_distinct_0_1.b + -> Parallel Seq Scan on t_distinct_0 t_distinct_0_1 + Optimizer: Postgres query optimizer +(34 rows) + +reset enable_groupagg; +reset enable_hashagg; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; + QUERY PLAN +-------------------------------------------------------------------------------------------------- + Gather Motion 6:1 (slice1; segments: 6) + -> HashAggregate + Group Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice2; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> Append + -> GroupAggregate + Group Key: t_distinct_0.a + -> Sort + Sort Key: t_distinct_0.a + -> Redistribute Motion 6:6 (slice3; segments: 6) + Hash Key: t_distinct_0.a + Hash Module: 3 + -> Streaming HashAggregate + Group Key: t_distinct_0.a + -> Parallel Seq Scan on t_distinct_0 + -> GroupAggregate + Group Key: t_distinct_0_1.b + -> Sort + Sort Key: t_distinct_0_1.b + -> Redistribute Motion 6:6 (slice4; segments: 6) + Hash Key: t_distinct_0_1.b + Hash Module: 3 + -> Streaming HashAggregate + Group Key: t_distinct_0_1.b + -> Parallel Seq Scan on t_distinct_0 t_distinct_0_1 + Optimizer: Postgres query optimizer +(28 rows) + +reset enable_parallel; +-- +-- End of test Parallel UNION +-- -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index 74a60e6ed2d..95d3b6b50d0 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -619,7 +619,7 @@ explain (locus, costs off) select * from rt1 union all select * from rt2; explain (locus, costs off) select * from rt1 union all select * from t1; -- SegmentGeneralWorkers (Converted to Strewn, Limited on One Segment) + Hashed = Strewn explain (locus, costs off) select * from rt1 union all select * from t2; --- SingleQE as subquery seems cannot produce partial_pathlist and don't have chance to parallel append. +-- partial subpath under UNION ALL explain (locus, costs off) select a from rt1 union all select count(*) as a from sq1; -- SegmentGeneralWorkers + General = SegmentGeneralWorkers explain (locus, costs off) select a from rt1 union all select a from generate_series(1, 1000) a; @@ -1049,6 +1049,31 @@ select distinct a from t_distinct_1; -- End of test Parallel DISTINCT -- +-- +-- Test Parallel UNION +-- +set enable_parallel = off; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; +set enable_parallel = on; +set enable_groupagg = off; +set enable_hashagg = on; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; +reset enable_groupagg; +set enable_hashagg = off; +set enable_groupagg = on; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; +reset enable_groupagg; +reset enable_hashagg; +explain(costs off) +select distinct a from t_distinct_0 union select distinct b from t_distinct_0; +reset enable_parallel; +-- +-- End of test Parallel UNION +-- + -- start_ignore drop schema test_parallel cascade; -- end_ignore