diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1a4a7866540b8a..38f09804a99e53 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -362,6 +362,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } + if (_is_receiver_eof(id)) { + return Status::EndOfFile("receiver eof"); + } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 99aac70b1e5811..bf9e201f837022 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -106,6 +106,8 @@ class PipelineFragmentContext : public TaskExecutionContext { const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = ""); + void set_reach_limit() { _query_ctx->set_reach_limit(); } + // TODO: Support pipeline runtime filter QueryContext* get_query_context() { return _query_ctx.get(); } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index d387680310452a..1dd2f7dfd74497 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -281,7 +281,11 @@ void TaskScheduler::_do_work(size_t index) { auto status = Status::OK(); try { - status = task->execute(&eos); + if (task->query_context()->reach_limit()) { + eos = true; + } else { + status = task->execute(&eos); + } } catch (const Exception& e) { status = e.to_status(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 90ebbff37a26f2..d3738863e1aa95 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1060,8 +1060,12 @@ void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, auto itr = _pipeline_map.find(instance_id); if (itr != _pipeline_map.end()) { - // calling PipelineFragmentContext::cancel - itr->second->cancel(reason, msg); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + itr->second->set_reach_limit(); + } else { + // calling PipelineFragmentContext::cancel + itr->second->cancel(reason, msg); + } } else { LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) << " to cancel"; @@ -1069,8 +1073,12 @@ void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, } else { auto itr = _fragment_instance_map.find(instance_id); if (itr != _fragment_instance_map.end()) { - // calling PlanFragmentExecutor::cancel - itr->second->cancel(reason, msg); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + itr->second->set_reach_limit(); + } else { + // calling PlanFragmentExecutor::cancel + itr->second->cancel(reason, msg); + } } else { LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id) << " to cancel"; diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 41817e5308d1b5..ffd99547fead50 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -117,6 +117,8 @@ class PlanFragmentExecutor : public TaskExecutionContext { // in open()/get_next(). void close(); + void set_reach_limit() { _query_ctx->set_reach_limit(); } + // Initiate cancellation. Must not be called until after prepare() returned. void cancel(const PPlanFragmentCancelReason& reason = PPlanFragmentCancelReason::INTERNAL_ERROR, const std::string& msg = ""); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 6d392c561759c9..52ac3f694eed5f 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -105,6 +105,8 @@ class QueryContext { void set_ready_to_execute(bool is_cancelled); [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); } + [[nodiscard]] bool reach_limit() const { return _reach_limit.load(); } + void set_reach_limit() { _reach_limit = true; } bool cancel(bool v, std::string msg, Status new_status, int fragment_id = -1); void set_exec_status(Status new_status) { @@ -253,6 +255,7 @@ class QueryContext { // And all fragments of this query will start execution when this is set to true. std::atomic _ready_to_execute {false}; std::atomic _is_cancelled {false}; + std::atomic _reach_limit {false}; std::shared_ptr _shared_hash_table_controller; std::shared_ptr _shared_scanner_controller; diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 16377027132528..c57d38076241c2 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -1403,6 +1403,11 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_io_ctx && _io_ctx->should_stop) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } if (_push_down_agg_type == TPushAggOp::type::COUNT) { auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size); diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 8c5818cba9ffe1..309aed96a8cf7f 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -68,7 +68,7 @@ class PipScannerContext : public vectorized::ScannerContext { { std::unique_lock l(*_queue_mutexs[id]); if (_blocks_queues[id].empty()) { - *eos = _is_finished || _should_stop; + *eos = done(); return Status::OK(); } if (_process_status.is()) { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 8a6f6de6e65c43..99f645ca9e574b 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -273,7 +273,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo int num_running_scanners = _num_running_scanners; bool is_scheduled = false; - if (to_be_schedule && _num_running_scanners == 0) { + if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { @@ -287,8 +287,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (wait) { // scanner batch wait time SCOPED_TIMER(_scanner_wait_batch_timer); - while (!(!_blocks_queue.empty() || _is_finished || !status().ok() || - state->is_cancelled())) { + while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) { if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) { LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue << ", serving_blocks_num " << serving_blocks_num @@ -330,7 +329,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } } } else { - *eos = _is_finished; + *eos = done(); } } @@ -400,8 +399,7 @@ void ScannerContext::dec_num_scheduling_ctx() { void ScannerContext::set_ready_to_finish() { // `_should_stop == true` means this task has already ended and wait for pending finish now. - if (_finish_dependency && _should_stop && _num_running_scanners == 0 && - _num_scheduling_ctx == 0) { + if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { _finish_dependency->set_ready(); } } @@ -524,6 +522,9 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); + if (done()) { + return; + } auto state = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { @@ -546,7 +547,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _num_running_scanners--; set_ready_to_finish(); - if (should_be_scheduled()) { + if (!done() && should_be_scheduled()) { auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { _num_scheduling_ctx++; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 2d1328c0feae9a..a7c0c3c4062ca8 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -113,12 +113,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { if (state->is_cancelled()) { return Status::Cancelled("cancelled"); } - + *eof = *eof || _should_stop; // set eof to true if per scanner limit is reached // currently for query: ORDER BY key LIMIT n - if (_limit > 0 && _num_rows_return >= _limit) { - *eof = true; - } + *eof = *eof || (_limit > 0 && _num_rows_return >= _limit); return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 4d55e0fcb064e8..6b1a1fe2ad299c 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -639,7 +639,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { // 1. calculate range // 2. dispatch rows to channel } - return Status::OK(); + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + return Status::OK(); + } + } + return Status::EndOfFile("all data stream channels EOF"); } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3c8dd8c1dc80e6..2aef6ba0a1b05a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1346,7 +1346,7 @@ public RowBatch getNext() throws Exception { this.returnedAllResults = true; // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); + long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); @@ -1359,6 +1359,12 @@ public RowBatch getNext() throws Exception { } else if (resultBatch.getBatch() != null) { numReceivedRows += resultBatch.getBatch().getRowsSize(); } + long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); + if (numLimitRows > 0) { + if (numReceivedRows >= numLimitRows) { + cleanRemoteFragmentsAsync(Types.PPlanFragmentCancelReason.LIMIT_REACH); + } + } return resultBatch; } @@ -1475,6 +1481,18 @@ private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long b executionProfile.onCancel(); } + private void cleanRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cleanReason) { + if (enablePipelineEngine) { + for (PipelineExecContext ctx : pipelineExecContexts.values()) { + ctx.cleanFragmentInstance(cleanReason); + } + } else { + for (BackendExecState backendExecState : backendExecStates) { + backendExecState.cleanFragmentInstance(cleanReason); + } + } + } + private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { @@ -3014,6 +3032,18 @@ public synchronized void printProfile(StringBuilder builder) { this.instanceProfile.prettyPrint(builder, ""); } + public synchronized void cleanFragmentInstance(Types.PPlanFragmentCancelReason cleanReason) { + if (!initiated || done || hasCanceled) { + return; + } + try { + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + fragmentInstanceId(), cleanReason); + } catch (RpcException ignored) { + // do nothing + } + } + // cancel the fragment instance. // return true if cancel success. Otherwise, return false public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { @@ -3213,6 +3243,22 @@ public synchronized void printProfile(StringBuilder builder) { }); } + // clean all fragment instances, inorder to stop the running instances when query is finished. + // for query with limit statement. + public synchronized void cleanFragmentInstance(Types.PPlanFragmentCancelReason cleanReason) { + if (!initiated || done || hasCanceled) { + return; + } + for (TPipelineInstanceParams localParam : rpcParams.local_params) { + try { + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + localParam.fragment_instance_id, cleanReason); + } catch (RpcException ignored) { + // do nothing + } + } + } + // cancel all fragment instances. // return true if cancel success. Otherwise, return false