From c3965d84782319ee665169c4ea7b77a24e6b94bc Mon Sep 17 00:00:00 2001 From: kang <35803862+ghkang98@users.noreply.github.com> Date: Mon, 17 Mar 2025 11:18:41 +0800 Subject: [PATCH] [fix] (streamload) fixed the issue of data loss due to concurrency when importing data from streamload (#48948) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is mainly to solve the multithreading problem caused by inconsistent visible order of EOS and data_queue variables in doris's streamload function and asyn_result_writer in the process_block process due to the compilation reordering of the ARM system or the weak memory order problem, which leads to data loss. Problem Summary: Mainly in the arm architecture, streamload has data loss problems. The transaction of importing data can be executed and submitted normally, but the NumberTotalRowshe NumberFilterRows in the returned load result are both zero [Uploading stream_load_lost_data.docx…]() ((https://github.com/user-attachments/files/19201955/stream_load.docx)) Co-authored-by: lik40 --- .../vec/sink/writer/async_result_writer.cpp | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 29877cb268affa..0cc37e3458bf77 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -108,41 +108,43 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } DCHECK(_dependency); - if (_writer_status.ok()) { - while (true) { - ThreadCpuStopWatch cpu_time_stop_watch; - cpu_time_stop_watch.start(); - Defer defer {[&]() { - if (state && state->get_query_ctx()) { - state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); - } - }}; - if (!_eos && _data_queue.empty() && _writer_status.ok()) { - std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { - // Add 1s to check to avoid lost signal - _cv.wait_for(l, std::chrono::seconds(1)); - } + while (_writer_status.ok()) { + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer defer {[&]() { + if (state && state->get_query_ctx()) { + state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); + } + }}; + + //1) wait scan operator write data + { + std::unique_lock l(_m); + while (!_eos && _data_queue.empty() && _writer_status.ok()) { + // Add 1s to check to avoid lost signal + _cv.wait_for(l, std::chrono::seconds(1)); } + //check if eos or writer error if ((_eos && _data_queue.empty()) || !_writer_status.ok()) { _data_queue.clear(); break; } + } - auto block = _get_block_from_queue(); - auto status = write(state, *block); - if (!status.ok()) [[unlikely]] { - std::unique_lock l(_m); - _writer_status.update(status); - if (_is_finished()) { - _dependency->set_ready(); - } - break; + //2) get the block from data queue and write to downstream + auto block = _get_block_from_queue(); + auto status = write(state, *block); + if (!status.ok()) [[unlikely]] { + std::unique_lock l(_m); + _writer_status.update(status); + if (_is_finished()) { + _dependency->set_ready(); } - - _return_free_block(std::move(block)); + break; } + + _return_free_block(std::move(block)); } bool need_finish = false;