From cd467df94f2ed4a92a8007ac5582fcf1a05e29bf Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Fri, 22 Dec 2023 16:03:53 +0800 Subject: [PATCH] fix --- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 +++- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index fe7388735ed413..e33aec13c241d2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -212,6 +212,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_total_load_streams(request.total_load_streams); _runtime_state->set_num_local_sink(request.num_local_sink); + _use_global_rf = request.__isset.parallel_instances && (request.__isset.per_node_shared_scans && + !request.per_node_shared_scans.empty()); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( @@ -985,7 +987,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, - request.__isset.parallel_instances)); + _use_global_rf)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index be0e26461debeb..6fe3488096431d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -168,6 +168,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector>> _tasks; + bool _use_global_rf = false; + // It is used to manage the lifecycle of RuntimeFilterMergeController std::vector> _merge_controller_handlers;