diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 68d899e0c48b8d..98b262a4e20a27 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -300,6 +300,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _instance_to_sending_by_pipeline[id] = true; } + if (_is_receiver_eof(id)) { + return Status::EndOfFile("receiver eof"); + } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 1b44894b84231a..5a5db8c49060d1 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -90,6 +90,8 @@ class PipelineFragmentContext : public std::enable_shared_from_thisset_reach_limit(); } + // TODO: Support pipeline runtime filter QueryContext* get_query_context() { return _query_ctx.get(); } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index aab5e1075556c3..06057ea5a9073b 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -264,7 +264,11 @@ void TaskScheduler::_do_work(size_t index) { auto status = Status::OK(); try { - status = task->execute(&eos); + if (task->query_context()->reach_limit()) { + eos = true; + } else { + status = task->execute(&eos); + } } catch (const Exception& e) { status = e.to_status(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7e7330a43ecd21..9dc34559174f65 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1008,7 +1008,11 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancel } if (pipeline_fragment_ctx) { find_the_fragment = true; - pipeline_fragment_ctx->cancel(reason, msg); + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + pipeline_fragment_ctx->set_reach_limit(); + } else { + pipeline_fragment_ctx->cancel(reason, msg); + } } if (!find_the_fragment) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index c7651a687dd6f9..dd59822dcae7c9 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -103,6 +103,8 @@ class QueryContext { ThreadPoolToken* get_token() { return _thread_token.get(); } + [[nodiscard]] bool reach_limit() const { return _reach_limit.load(); } + void set_reach_limit() { _reach_limit = true; } void set_ready_to_execute(bool is_cancelled) { { std::lock_guard l(_start_lock); @@ -218,6 +220,7 @@ class QueryContext { // And all fragments of this query will start execution when this is set to true. std::atomic _ready_to_execute {false}; std::atomic _is_cancelled {false}; + std::atomic _reach_limit {false}; std::shared_ptr _shared_hash_table_controller; std::shared_ptr _shared_scanner_controller; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index b98c628368eeeb..28c1e741b2412a 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -54,7 +54,7 @@ class PipScannerContext : public vectorized::ScannerContext { { std::unique_lock l(*_queue_mutexs[id]); if (_blocks_queues[id].empty()) { - *eos = _is_finished || _should_stop; + *eos = done(); return Status::OK(); } else { *block = std::move(_blocks_queues[id].front()); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 3421c914b6b900..0239a4eec75466 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -177,7 +177,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo // (if the scheduler continues to schedule, it will cause a lot of busy running). // At this point, consumers are required to trigger new scheduling to ensure that // data can be continuously fetched. - if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { + if (!done() && has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; @@ -211,7 +211,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { - *eos = _is_finished; + *eos = done(); } return Status::OK(); } @@ -352,6 +352,9 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); + if (done()) { + return; + } auto state = _scanner_scheduler->submit(this); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { @@ -367,7 +370,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { _scanners.push_front(scanner); } std::lock_guard l(_transfer_lock); - if (has_enough_space_in_blocks_queue()) { + if (!done() && has_enough_space_in_blocks_queue()) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 2264eab9aa4a17..58353a77575cc2 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -97,12 +97,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { if (state->is_cancelled()) { return Status::Cancelled("cancelled"); } - + *eof = *eof || _should_stop; // set eof to true if per scanner limit is reached // currently for query: ORDER BY key LIMIT n - if (_limit > 0 && _num_rows_return >= _limit) { - *eof = true; - } + *eof = *eof || (_limit > 0 && _num_rows_return >= _limit); return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 0cd5595dd6ba61..72296228be469a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -676,7 +676,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { // 1. calculate range // 2. dispatch rows to channel } - return Status::OK(); + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + return Status::OK(); + } + } + return Status::EndOfFile("all data stream channels EOF"); } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a03d88863a88f0..59c29bd14c59cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1186,7 +1186,7 @@ public RowBatch getNext() throws Exception { this.returnedAllResults = true; // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); + long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); boolean hasLimit = numLimitRows > 0; if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { LOG.debug("no block query, return num >= limit rows, need cancel"); @@ -1199,6 +1199,12 @@ public RowBatch getNext() throws Exception { } else if (resultBatch.getBatch() != null) { numReceivedRows += resultBatch.getBatch().getRowsSize(); } + long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); + if (numLimitRows > 0) { + if (numReceivedRows >= numLimitRows) { + cleanRemoteFragmentsAsync(Types.PPlanFragmentCancelReason.LIMIT_REACH); + } + } return resultBatch; } @@ -1239,6 +1245,18 @@ private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { executionProfile.onCancel(); } + private void cleanRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cleanReason) { + if (enablePipelineEngine) { + for (PipelineExecContext ctx : pipelineExecContexts.values()) { + ctx.cleanFragmentInstance(cleanReason); + } + } else { + for (BackendExecState backendExecState : backendExecStates) { + backendExecState.cleanFragmentInstance(cleanReason); + } + } + } + private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { @@ -2695,6 +2713,18 @@ public synchronized void printProfile(StringBuilder builder) { this.instanceProfile.prettyPrint(builder, ""); } + public synchronized void cleanFragmentInstance(Types.PPlanFragmentCancelReason cleanReason) { + if (!initiated || done || hasCanceled) { + return; + } + try { + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + fragmentInstanceId(), cleanReason); + } catch (RpcException ignored) { + // do nothing + } + } + // cancel the fragment instance. // return true if cancel success. Otherwise, return false public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { @@ -2854,6 +2884,22 @@ public synchronized void printProfile(StringBuilder builder) { }); } + // clean all fragment instances, inorder to stop the running instances when query is finished. + // for query with limit statement. + public synchronized void cleanFragmentInstance(Types.PPlanFragmentCancelReason cleanReason) { + if (!initiated || done || hasCanceled) { + return; + } + for (TPipelineInstanceParams localParam : rpcParams.local_params) { + try { + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + localParam.fragment_instance_id, cleanReason); + } catch (RpcException ignored) { + // do nothing + } + } + } + // cancel all fragment instances. // return true if cancel success. Otherwise, return false public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {