Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,45 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
}
}

// If the last block is sent successfuly, then call finish to clear the buffer or commit
// transactions.
// Using lock to make sure the writer status is not modified
// There is a unique ptr err_msg in Status, if it is modified, the unique ptr
// maybe released. And it will core because use after free.
std::lock_guard l(_m);
bool need_finish = false;
{
// If the last block is sent successfuly, then call finish to clear the buffer or commit
// transactions.
// Using lock to make sure the writer status is not modified
// There is a unique ptr err_msg in Status, if it is modified, the unique ptr
// maybe released. And it will core because use after free.
std::lock_guard l(_m);
if (_writer_status.ok() && _eos) {
need_finish = true;
}
}
// eos only means the last block is input to the queue and there is no more block to be added,
// it is not sure that the block is written to stream.
if (_writer_status.ok() && _eos) {
_writer_status = finish(state);
if (need_finish) {
// Should not call finish in lock because it may hang, and it will lock _m too long.
// And get_writer_status will also need this lock, it will block pipeline exec thread.
Status st = finish(state);
std::lock_guard l(_m);
_writer_status = st;
}
Status st = Status::OK();
{
std::lock_guard l(_m);
st = _writer_status;
}
// should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout
_set_ready_to_finish();

Status close_st = close(_writer_status);
// If it is already failed before, then not update the write status so that we could get
// the real reason.
if (_writer_status.ok()) {
_writer_status = close_st;
Status close_st = close(st);
{
// If it is already failed before, then not update the write status so that we could get
// the real reason.
std::lock_guard l(_m);
if (_writer_status.ok()) {
_writer_status = close_st;
}
_writer_thread_closed = true;
}
_writer_thread_closed = true;
// should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout
_set_ready_to_finish();
}

void AsyncResultWriter::_set_ready_to_finish() {
Expand Down