From 68397d6e27d698fceac40a4cf369bc0580d2741f Mon Sep 17 00:00:00 2001 From: yiguolei Date: Sun, 31 Mar 2024 19:42:01 +0800 Subject: [PATCH 1/2] [bugfix](asyncwriter) async writer's lock should not include finish or close method --- .../vec/sink/writer/async_result_writer.cpp | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index fb729640211ddf..149f758ece1555 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -154,27 +154,46 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } } - // If the last block is sent successfuly, then call finish to clear the buffer or commit - // transactions. - // Using lock to make sure the writer status is not modified - // There is a unique ptr err_msg in Status, if it is modified, the unique ptr - // maybe released. And it will core because use after free. - std::lock_guard l(_m); + bool need_finish = false; + { + // If the last block is sent successfuly, then call finish to clear the buffer or commit + // transactions. + // Using lock to make sure the writer status is not modified + // There is a unique ptr err_msg in Status, if it is modified, the unique ptr + // maybe released. And it will core because use after free. + std::lock_guard l(_m); + if (_writer_status.ok() && _eos) { + need_finish = true; + } + } // eos only means the last block is input to the queue and there is no more block to be added, // it is not sure that the block is written to stream. - if (_writer_status.ok() && _eos) { - _writer_status = finish(state); + if (need_finish) { + // Should not call finish in lock because it may hang, and it will lock _m too long. + // And get_writer_status will also need this lock, it will block pipeline exec thread. + Status st = finish(state); + std::lock_guard l(_m); + _writer_status = st; } // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout _set_ready_to_finish(); - Status close_st = close(_writer_status); - // If it is already failed before, then not update the write status so that we could get - // the real reason. - if (_writer_status.ok()) { - _writer_status = close_st; + Status st = Status::OK(); + { + std::lock_guard l(_m); + st = _writer_status; + } + + Status close_st = close(st); + { + // If it is already failed before, then not update the write status so that we could get + // the real reason. + std::lock_guard l(_m); + if (_writer_status.ok()) { + _writer_status = close_st; + } + _writer_thread_closed = true; } - _writer_thread_closed = true; } void AsyncResultWriter::_set_ready_to_finish() { From 0fcc5670be4538abd597454d63b73cc08e648581 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Sun, 31 Mar 2024 21:22:34 +0800 Subject: [PATCH 2/2] f --- be/src/vec/sink/writer/async_result_writer.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 149f758ece1555..9a84f374464a10 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -175,9 +175,6 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi std::lock_guard l(_m); _writer_status = st; } - // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout - _set_ready_to_finish(); - Status st = Status::OK(); { std::lock_guard l(_m); @@ -194,6 +191,8 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi } _writer_thread_closed = true; } + // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout + _set_ready_to_finish(); } void AsyncResultWriter::_set_ready_to_finish() {