diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 16377027132528..c57d38076241c2 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1403,6 +1403,11 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_io_ctx && _io_ctx->should_stop) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } if (_push_down_agg_type == TPushAggOp::type::COUNT) { auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 8c5818cba9ffe1..309aed96a8cf7f 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -68,7 +68,7 @@ class PipScannerContext : public vectorized::ScannerContext { { std::unique_lock l(*_queue_mutexs[id]); if (_blocks_queues[id].empty()) { - *eos = _is_finished || _should_stop; + *eos = done(); return Status::OK(); } if (_process_status.is()) { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 8a6f6de6e65c43..99f645ca9e574b 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -273,7 +273,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo int num_running_scanners = _num_running_scanners; bool is_scheduled = false; - if (to_be_schedule && _num_running_scanners == 0) { + if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { @@ -287,8 +287,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (wait) { // scanner batch wait time SCOPED_TIMER(_scanner_wait_batch_timer); - while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || - state->is_cancelled())) { + while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue << ", serving_blocks_num " << serving_blocks_num @@ -330,7 +329,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } } } else { - *eos = _is_finished; + *eos = done(); } } @@ -400,8 +399,7 @@ void ScannerContext::dec_num_scheduling_ctx() { void ScannerContext::set_ready_to_finish() { // `_should_stop == true` means this task has already ended and wait for pending finish now. - if (_finish_dependency && _should_stop && _num_running_scanners == 0 && - _num_scheduling_ctx == 0) { + if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { _finish_dependency->set_ready(); } } @@ -524,6 +522,9 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); + if (done()) { + return; + } auto state = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { @@ -546,7 +547,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _num_running_scanners--; set_ready_to_finish(); - if (should_be_scheduled()) { + if (!done() && should_be_scheduled()) { auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { _num_scheduling_ctx++; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 2d1328c0feae9a..a7c0c3c4062ca8 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -113,12 +113,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { if (state->is_cancelled()) { return Status::Cancelled("cancelled"); } - + *eof = *eof || _should_stop; // set eof to true if per scanner limit is reached // currently for query: ORDER BY key LIMIT n - if (_limit > 0 && _num_rows_return >= _limit) { - *eof = true; - } + *eof = *eof || (_limit > 0 && _num_rows_return >= _limit); return Status::OK(); }