Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 28 additions & 26 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,41 +108,43 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
}

DCHECK(_dependency);
if (_writer_status.ok()) {
while (true) {
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (state && state->get_query_ctx()) {
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
}
}};
if (!_eos && _data_queue.empty() && _writer_status.ok()) {
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {
// Add 1s to check to avoid lost signal
_cv.wait_for(l, std::chrono::seconds(1));
}
while (_writer_status.ok()) {
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (state && state->get_query_ctx()) {
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
}
}};

//1) wait scan operator write data
{
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {
// Add 1s to check to avoid lost signal
_cv.wait_for(l, std::chrono::seconds(1));
}

//check if eos or writer error
if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
_data_queue.clear();
break;
}
}

auto block = _get_block_from_queue();
auto status = write(state, *block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status.update(status);
if (_is_finished()) {
_dependency->set_ready();
}
break;
//2) get the block from data queue and write to downstream
auto block = _get_block_from_queue();
auto status = write(state, *block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
_writer_status.update(status);
if (_is_finished()) {
_dependency->set_ready();
}

_return_free_block(std::move(block));
break;
}

_return_free_block(std::move(block));
}

bool need_finish = false;
Expand Down
Loading