diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 81a84451e89c0a..a8e62f4f6d3524 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -106,6 +106,7 @@ #include "runtime/thread_context.h" #include "service/backend_options.h" #include "util/container_util.hpp" +#include "util/countdown_latch.h" #include "util/debug_util.h" #include "util/uid_util.h" #include "vec/common/sort/heap_sorter.h" @@ -513,27 +514,28 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag target_size > _runtime_state->query_options().parallel_prepare_threshold)) { // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads std::vector prepare_status(target_size); - std::mutex m; - std::condition_variable cv; - int prepare_done = 0; + int submitted_tasks = 0; + Status submit_status; + CountDownLatch latch((int)target_size); for (size_t i = 0; i < target_size; i++) { - RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { + submit_status = thread_pool->submit_func([&, i]() { SCOPED_ATTACH_TASK(_query_ctx.get()); prepare_status[i] = pre_and_submit(i, this); - std::unique_lock lock(m); - prepare_done++; - if (prepare_done == target_size) { - cv.notify_one(); - } - })); - } - std::unique_lock lock(m); - if (prepare_done != target_size) { - cv.wait(lock); - for (size_t i = 0; i < target_size; i++) { - if (!prepare_status[i].ok()) { - return prepare_status[i]; - } + latch.count_down(); + }); + if (LIKELY(submit_status.ok())) { + submitted_tasks++; + } else { + break; + } + } + latch.arrive_and_wait(target_size - submitted_tasks); + if (UNLIKELY(!submit_status.ok())) { + return submit_status; + } + for (int i = 0; i < submitted_tasks; i++) { + if (!prepare_status[i].ok()) { + return prepare_status[i]; } } } else { diff --git a/be/src/util/countdown_latch.h b/be/src/util/countdown_latch.h index a41a417d20f707..b27737e8bb1d16 100644 --- a/be/src/util/countdown_latch.h +++ b/be/src/util/countdown_latch.h @@ -91,6 +91,13 @@ class CountDownLatch { } } + // decrements the internal counter by n and blocks the calling thread until the counter reaches zero. + void arrive_and_wait(uint64_t n) { + DCHECK_GE(n, 0); + count_down(n); + wait(); + } + uint64_t count() const { std::lock_guard lock(_lock); return _count;