From f1b0bfe1faccb3939ec977c727e640ccd777ad95 Mon Sep 17 00:00:00 2001 From: lik40 Date: Fri, 14 Mar 2025 20:12:33 +0800 Subject: [PATCH] [fix] (straemload) fixed the issue of data loss due to concurrency when importing data from streamload --- .../vec/sink/writer/async_result_writer.cpp | 56 ++++++++++--------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 4e746c6f1c204e..c4b3689e6f5fec 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -130,42 +130,44 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } DCHECK(_dependency); - if (_writer_status.ok()) { - while (true) { - ThreadCpuStopWatch cpu_time_stop_watch; - cpu_time_stop_watch.start(); - Defer defer {[&]() { - if (state && state->get_query_ctx()) { - state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( - cpu_time_stop_watch.elapsed_time()); - } - }}; - if (!_eos && _data_queue.empty() && _writer_status.ok()) { - std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { - // Add 1s to check to avoid lost signal - _cv.wait_for(l, std::chrono::seconds(1)); - } + while (_writer_status.ok()) { + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer defer {[&]() { + if (state && state->get_query_ctx()) { + state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( + cpu_time_stop_watch.elapsed_time()); } + }}; + //1) wait scan operator write data + { + std::unique_lock l(_m); + while (!_eos && _data_queue.empty() && _writer_status.ok()) { + // Add 1s to check to avoid lost signal + _cv.wait_for(l, std::chrono::seconds(1)); + } + + //check if eos or writer error if ((_eos && _data_queue.empty()) || !_writer_status.ok()) { _data_queue.clear(); break; } + } - auto block = _get_block_from_queue(); - auto status = write(state, *block); - if (!status.ok()) [[unlikely]] { - std::unique_lock l(_m); - _writer_status.update(status); - if (_is_finished()) { - _dependency->set_ready(); - } - break; + //2) get the block from data queue and write to downstream + auto block = _get_block_from_queue(); + auto status = write(state, *block); + if (!status.ok()) [[unlikely]] { + std::unique_lock l(_m); + _writer_status.update(status); + if (_is_finished()) { + _dependency->set_ready(); } - - _return_free_block(std::move(block)); + break; } + + _return_free_block(std::move(block)); } bool need_finish = false;