From b9188406ec2fb2b49b62211a0a2743e38c7336ca Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 23 Aug 2023 19:40:17 +0800 Subject: [PATCH 1/2] [Improvement](pipeline) Terminate early for short-circuit join (#23378) --- be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/operator.h | 4 +++ be/src/pipeline/pipeline_task.cpp | 8 +++--- be/src/pipeline/pipeline_task.h | 38 ++++++++++++++++++++++++-- be/src/vec/exec/join/vjoin_node_base.h | 2 ++ 5 files changed, 48 insertions(+), 6 deletions(-) diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ad7eb83074c9f0..ae7b40dc20eae8 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -134,6 +134,8 @@ class ExecNode { bool can_read() const { return _can_read; } + [[nodiscard]] virtual bool can_terminate_early() { return false; } + // Sink Data to ExecNode to do some stock work, both need impl with method: get_result // `eos` means source is exhausted, exec node should do some finalize work // Eg: Aggregation, Sort diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index acf55cb7bc465a..38ed45ed89f586 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -199,6 +199,8 @@ class OperatorBase { virtual bool can_write() { return false; } // for sink + [[nodiscard]] virtual bool can_terminate_early() { return false; } + /** * The main method to execute a pipeline task. * Now it is a pull-based pipeline and operators pull data from its child by this method. @@ -321,6 +323,8 @@ class StreamingOperator : public OperatorBase { ~StreamingOperator() override = default; + [[nodiscard]] bool can_terminate_early() override { return _node->can_terminate_early(); } + Status prepare(RuntimeState* state) override { _node->increase_ref(); _use_projection = _node->has_output_row_descriptor(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4d0fb49de80ff5..645028a3dc07c3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -222,11 +222,11 @@ Status PipelineTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } - if (!_source->can_read()) { + if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); return Status::OK(); } @@ -234,11 +234,11 @@ Status PipelineTask::execute(bool* eos) { this->set_begin_execute_time(); while (!_fragment_context->is_canceled()) { - if (_data_state != SourceState::MORE_DATA && !_source->can_read()) { + if (_data_state != SourceState::MORE_DATA && !source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 62de1fd2815982..dcd65c469b738b 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -152,13 +152,47 @@ class PipelineTask { return false; } - bool source_can_read() { return _source->can_read(); } + bool source_can_read() { return _source->can_read() || ignore_blocking_source();; } bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); } - bool sink_can_write() { return _sink->can_write(); } + bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); } + /** + * Consider the query plan below: + * + * ExchangeSource JoinBuild1 + * \ / + * JoinProbe1 (Right Outer) JoinBuild2 + * \ / + * JoinProbe2 (Right Outer) + * | + * Sink + * + * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource + * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows. + * + * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will + * produce more data. + * + * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource + * and Sink because JoinProbe2 will always produce 0 rows and terminate early. + * + * In a nutshell, we should follow the rules: + * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. + * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. + */ + [[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); } + + [[nodiscard]] virtual bool ignore_blocking_source() { + for (size_t i = 1; i < _operators.size(); i++) { + if (_operators[i]->can_terminate_early()) { + return true; + } + } + return false; + } Status finalize(); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 120e77785ed795..8756c24d20515a 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -74,6 +74,8 @@ class VJoinNodeBase : public ExecNode { virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + [[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; } + protected: // Construct the intermediate blocks to store the results from join operation. void _construct_mutable_join_block(); From ef92e6ab5f528712b222f789fb55697a7c8f6abe Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 24 Aug 2023 09:49:57 +0800 Subject: [PATCH 2/2] update --- be/src/pipeline/pipeline_task.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dcd65c469b738b..34382a3f7c5356 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -152,7 +152,10 @@ class PipelineTask { return false; } - bool source_can_read() { return _source->can_read() || ignore_blocking_source();; } + bool source_can_read() { + return _source->can_read() || ignore_blocking_source(); + ; + } bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); @@ -183,9 +186,9 @@ class PipelineTask { * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. */ - [[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); } + [[nodiscard]] bool ignore_blocking_sink() { return _root->can_terminate_early(); } - [[nodiscard]] virtual bool ignore_blocking_source() { + [[nodiscard]] bool ignore_blocking_source() { for (size_t i = 1; i < _operators.size(); i++) { if (_operators[i]->can_terminate_early()) { return true;