From bce6e301875e4abb030f3c763255e573ff93ab42 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Sun, 30 Jul 2023 18:46:22 +0800 Subject: [PATCH 1/2] Retire GUC parallel_hash_enable_motion_broadcast It's added by history reasons as a workaround. We don't have exactly the right parallel feature at the time. It's neither reasonable to keep it, nor we have test cases for this wired GUC. Retire it and keep codes same with UPSTREAM. Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/cdb/cdbpath.c | 11 ++++--- src/backend/optimizer/path/joinpath.c | 41 ++------------------------- src/backend/optimizer/util/pathnode.c | 8 ++---- src/backend/utils/misc/guc_gp.c | 11 ------- src/include/cdb/cdbpath.h | 3 +- src/include/optimizer/pathnode.h | 3 +- src/include/utils/guc.h | 1 - src/include/utils/unsync_guc_name.h | 1 - 8 files changed, 12 insertions(+), 67 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index a9a2b8ee1dd..bb27b826733 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -2871,8 +2871,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, List *inner_pathkeys, bool outer_require_existing_order, bool inner_require_existing_order, - bool parallel_aware, - bool uninterested_broadcast) + bool parallel_aware) { CdbpathMfjRel outer; CdbpathMfjRel inner; @@ -3699,7 +3698,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!small_rel->require_existing_order && small_rel->ok_to_replicate && ((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < large_rel->bytes)) || - (parallel_aware && !uninterested_broadcast && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < large_rel->bytes)))) + (parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < large_rel->bytes)))) { if (!parallel_aware) CdbPathLocus_MakeReplicated(&small_rel->move_to, @@ -3718,7 +3717,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!large_rel->require_existing_order && large_rel->ok_to_replicate && ((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes)) || - (parallel_aware && !uninterested_broadcast && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes)))) + (parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes)))) { if (!parallel_aware) CdbPathLocus_MakeReplicated(&large_rel->move_to, @@ -3749,7 +3748,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!small_rel->require_existing_order && small_rel->ok_to_replicate && ((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < small_rel->bytes + large_rel->bytes)) || - (parallel_aware && !uninterested_broadcast && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < small_rel->bytes + large_rel->bytes)))) + (parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < small_rel->bytes + large_rel->bytes)))) { if (!parallel_aware) CdbPathLocus_MakeReplicated(&small_rel->move_to, @@ -3765,7 +3764,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!large_rel->require_existing_order && large_rel->ok_to_replicate && ((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes + large_rel->bytes)) || - (parallel_aware && !uninterested_broadcast && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes + large_rel->bytes)))) + (parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes + large_rel->bytes)))) { if (!parallel_aware) CdbPathLocus_MakeReplicated(&large_rel->move_to, diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index c12317b2a89..55a9f4f44d4 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1089,8 +1089,7 @@ try_hashjoin_path(PlannerInfo *root, extra->restrictlist, required_outer, extra->redistribution_clauses, - hashclauses, - false), + hashclauses), root); } else @@ -1147,40 +1146,6 @@ try_partial_hashjoin_path(PlannerInfo *root, if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) return; - /* - * CBDB_PARALLEL_FIXME - * Customers encounter an issue that when parallel hash, broadcast motion - * a smaller table may be worser than redistribute a big table. - * We add a path whic doesn't try broadcast if possible. - * And let the path cost decide which is better. - */ - if (parallel_hash) - { - hashpath = create_hashjoin_path(root, - joinrel, - jointype, - orig_jointype, - &workspace, - extra, - outer_path, - inner_path, - true, - extra->restrictlist, - NULL, - extra->redistribution_clauses, - hashclauses, - true); /* not use broadcast */ - if (hashpath && hashpath->parallel_safe) - add_partial_path(joinrel, hashpath); - } - - /* - * CBDB_PARALLEL_FIXME: - * We only want non-broadcast in parallel hash if the guc is set. - */ - if (parallel_hash && !parallel_hash_enable_motion_broadcast) - return; - hashpath = create_hashjoin_path(root, joinrel, jointype, @@ -1193,8 +1158,8 @@ try_partial_hashjoin_path(PlannerInfo *root, extra->restrictlist, NULL, extra->redistribution_clauses, - hashclauses, - false); + hashclauses); + /* Might be good enough to be worth trying and no motion, so let's try it. */ if (hashpath && hashpath->parallel_safe) add_partial_path(joinrel, hashpath); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 709d2496015..9c50c24c121 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3911,7 +3911,6 @@ create_nestloop_path(PlannerInfo *root, NIL, outer_must_be_local, inner_must_be_local, - false, false); } @@ -4191,7 +4190,6 @@ create_mergejoin_path(PlannerInfo *root, innermotionkeys, preserve_outer_ordering, preserve_inner_ordering, - false, false); } @@ -4332,8 +4330,7 @@ create_hashjoin_path(PlannerInfo *root, List *restrict_clauses, Relids required_outer, List *redistribution_clauses, /* CDB */ - List *hashclauses, - bool uninterested_broadcast) /* GPDB parallel */ + List *hashclauses) { HashPath *pathnode; CdbPathLocus join_locus; @@ -4378,8 +4375,7 @@ create_hashjoin_path(PlannerInfo *root, NIL, outer_must_be_local, inner_must_be_local, - parallel_hash, - uninterested_broadcast); + parallel_hash); } if (CdbPathLocus_IsNull(join_locus)) diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 2e39cea261c..78b3471ce4c 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -303,7 +303,6 @@ bool optimizer_enable_indexjoin; bool optimizer_enable_motions_masteronly_queries; bool optimizer_enable_motions; bool optimizer_enable_motion_broadcast; -bool parallel_hash_enable_motion_broadcast; bool optimizer_enable_motion_gather; bool optimizer_enable_motion_redistribute; bool optimizer_enable_sort; @@ -2090,16 +2089,6 @@ struct config_bool ConfigureNamesBool_gp[] = true, NULL, NULL, NULL }, - { - {"parallel_hash_enable_motion_broadcast", PGC_USERSET, DEVELOPER_OPTIONS, - gettext_noop("Enable plans with Motion Broadcast operators in parallel hash join."), - NULL, - GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE - }, - ¶llel_hash_enable_motion_broadcast, - true, - NULL, NULL, NULL - }, { {"optimizer_enable_motion_gather", PGC_USERSET, DEVELOPER_OPTIONS, gettext_noop("Enable plans with Motion Gather operators in the optimizer."), diff --git a/src/include/cdb/cdbpath.h b/src/include/cdb/cdbpath.h index 8666958435a..e23cf1264c0 100644 --- a/src/include/cdb/cdbpath.h +++ b/src/include/cdb/cdbpath.h @@ -74,7 +74,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, List *inner_pathkeys, bool outer_require_existing_order, bool inner_require_existing_order, - bool parallel_aware, - bool uninterested_broadcast); /* for parallel hash join, do not use Broadcast if possible */ + bool parallel_aware); #endif /* CDBPATH_H */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d711a78bc2f..c4d50754c0f 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -216,8 +216,7 @@ extern Path *create_hashjoin_path(PlannerInfo *root, List *restrict_clauses, Relids required_outer, List *redistribution_clauses, /*CDB*/ - List *hashclauses, - bool uninterested_broadcast); /* GPDB parallel */ + List *hashclauses); extern ProjectionPath *create_projection_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 0e1c15bcf64..b71f89a4a58 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -496,7 +496,6 @@ extern bool optimizer_enable_indexjoin; extern bool optimizer_enable_motions_masteronly_queries; extern bool optimizer_enable_motions; extern bool optimizer_enable_motion_broadcast; -extern bool parallel_hash_enable_motion_broadcast; extern bool optimizer_enable_motion_gather; extern bool optimizer_enable_motion_redistribute; extern bool optimizer_enable_sort; diff --git a/src/include/utils/unsync_guc_name.h b/src/include/utils/unsync_guc_name.h index d35e7f72320..67483db6481 100644 --- a/src/include/utils/unsync_guc_name.h +++ b/src/include/utils/unsync_guc_name.h @@ -424,7 +424,6 @@ "optimizer_enable_materialize", "optimizer_enable_mergejoin", "optimizer_enable_motion_broadcast", - "parallel_hash_enable_motion_broadcast", "optimizer_enable_motion_gather", "optimizer_enable_motion_redistribute", "optimizer_enable_motions", From 3c3c2273eab37948ae4346f12b3c04162fcdd7f2 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Tue, 1 Aug 2023 22:30:41 +0800 Subject: [PATCH 2/2] Make BroadcastMotion if target plan's parallel < 1 when parallel aware. When there is a parallel ware join, we always make ParallelBroadcastMotion before to guarantee all data are replicated on segments across parallel processes, even if target side's parallel workers is 0, For example, inner(Hashed, workers=0) Join outer(HashedWorkers, workers=2) when parallel aware. If we ParallelBroadcastMotion the outer side to join with inner, it will get outer(ReplicatedWorkers) Join inner(Hashed) = Join Locus(HashedWorkers, workers=0). That will cause an Assert Failure in cdbpathlocus_parallel_join(). So, we should use BroadcastMotion instead of ParallelBroadcastMotion if target side's parallel workers is 0 when parallel aware. There are no differences between BroadcastMotion and ParallelBroadcastMotion when execution if target slice's parallel workers is 0. Another benefit is we could get a better Hashed locus instead of HashedWorkers for upper join with others directly without Motion. Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/cdb/cdbpath.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index bb27b826733..d671195ddf3 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -3655,6 +3655,8 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, { /* partitioned */ CdbpathMfjRel *large_rel = &outer; CdbpathMfjRel *small_rel = &inner; + int lp; /* larger rel parallel workers */ + int sp; /* small rel parallel workers */ /* Consider locus when parallel_ware. */ if(parallel_aware) @@ -3670,6 +3672,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, if (large_rel->bytes < small_rel->bytes) CdbSwap(CdbpathMfjRel *, large_rel, small_rel); + lp = CdbPathLocus_NumParallelWorkers(large_rel->locus); + sp = CdbPathLocus_NumParallelWorkers(small_rel->locus); + /* Both side are distribued in 1 segment and no parallel, it can join without motion. */ if (CdbPathLocus_NumSegments(large_rel->locus) == 1 && CdbPathLocus_NumSegments(small_rel->locus) == 1 && @@ -3700,7 +3705,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, ((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < large_rel->bytes)) || (parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < large_rel->bytes)))) { - if (!parallel_aware) + if (!parallel_aware || lp <= 1) CdbPathLocus_MakeReplicated(&small_rel->move_to, CdbPathLocus_NumSegments(large_rel->locus), large_rel->path->parallel_workers); @@ -3719,7 +3724,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, ((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes)) || (parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes)))) { - if (!parallel_aware) + if (!parallel_aware || sp <= 1) CdbPathLocus_MakeReplicated(&large_rel->move_to, CdbPathLocus_NumSegments(small_rel->locus), small_rel->path->parallel_workers); @@ -3750,7 +3755,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, ((!parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(large_rel->locus) < small_rel->bytes + large_rel->bytes)) || (parallel_aware && (small_rel->bytes * CdbPathLocus_NumSegments(large_rel->locus) < small_rel->bytes + large_rel->bytes)))) { - if (!parallel_aware) + if (!parallel_aware || lp <= 1) CdbPathLocus_MakeReplicated(&small_rel->move_to, CdbPathLocus_NumSegments(large_rel->locus), large_rel->path->parallel_workers); @@ -3766,7 +3771,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, ((!parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegmentsPlusParallelWorkers(small_rel->locus) < small_rel->bytes + large_rel->bytes)) || (parallel_aware && (large_rel->bytes * CdbPathLocus_NumSegments(small_rel->locus) < small_rel->bytes + large_rel->bytes)))) { - if (!parallel_aware) + if (!parallel_aware || sp <= 1) CdbPathLocus_MakeReplicated(&large_rel->move_to, CdbPathLocus_NumSegments(small_rel->locus), small_rel->path->parallel_workers); @@ -3808,7 +3813,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!small_rel->require_existing_order && small_rel->ok_to_replicate) { - if (!parallel_aware) + if (!parallel_aware || lp <= 1) CdbPathLocus_MakeReplicated(&small_rel->move_to, CdbPathLocus_NumSegments(large_rel->locus), large_rel->path->parallel_workers); @@ -3821,7 +3826,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, else if (!large_rel->require_existing_order && large_rel->ok_to_replicate) { - if (!parallel_aware) + if (!parallel_aware || sp <= 1) CdbPathLocus_MakeReplicated(&large_rel->move_to, CdbPathLocus_NumSegments(small_rel->locus), small_rel->path->parallel_workers);