From 09cea73e241bde566f049e52474708e618bdaf7f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 23 Jul 2024 11:48:02 +0800 Subject: [PATCH 1/3] [function](feature) Implement CRC32 function (#38204) --- be/src/vec/functions/function_string.cpp | 28 +++++++- .../doris/catalog/BuiltinScalarFunctions.java | 2 + .../expressions/functions/scalar/Crc32.java | 71 +++++++++++++++++++ .../visitor/ScalarFunctionVisitor.java | 5 ++ gensrc/script/doris_builtins_functions.py | 2 + .../test_string_function_like.out | 12 ++++ .../test_string_function_like.groovy | 4 ++ 7 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32.java diff --git a/be/src/vec/functions/function_string.cpp b/be/src/vec/functions/function_string.cpp index 7ebcac9ca1b381..edf43300f94c82 100644 --- a/be/src/vec/functions/function_string.cpp +++ b/be/src/vec/functions/function_string.cpp @@ -82,7 +82,7 @@ struct NameQuoteImpl { } }; -struct NameStringLenght { +struct NameStringLength { static constexpr auto name = "length"; }; @@ -104,6 +104,28 @@ struct StringLengthImpl { } }; +struct NameCrc32 { + static constexpr auto name = "crc32"; +}; + +struct Crc32Impl { + using ReturnType = DataTypeInt64; + static constexpr auto TYPE_INDEX = TypeIndex::String; + using Type = String; + using ReturnColumnType = ColumnVector; + + static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets, + PaddedPODArray& res) { + auto size = offsets.size(); + res.resize(size); + for (int i = 0; i < size; ++i) { + res[i] = crc32_z(0L, (const unsigned char*)data.data() + offsets[i - 1], + offsets[i] - offsets[i - 1]); + } + return Status::OK(); + } +}; + struct NameStringUtf8Length { static constexpr auto name = "char_length"; }; @@ -1073,7 +1095,8 @@ using StringFindInSetImpl = StringFunctionImpl; -using FunctionStringLength = FunctionUnaryToType; +using FunctionStringLength = FunctionUnaryToType; +using FunctionCrc32 = FunctionUnaryToType; using FunctionStringUTF8Length = FunctionUnaryToType; using FunctionStringSpace = FunctionUnaryToType; using FunctionStringStartsWith = @@ -1111,6 +1134,7 @@ using FunctionStringRPad = FunctionStringPad; void register_function_string(SimpleFunctionFactory& factory) { factory.register_function(); factory.register_function(); + factory.register_function(); factory.register_function(); factory.register_function(); factory.register_function(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java index 4b87fa37652510..f84bda52178c9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java @@ -122,6 +122,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Cosh; import org.apache.doris.nereids.trees.expressions.functions.scalar.CosineDistance; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -723,6 +724,7 @@ public class BuiltinScalarFunctions implements FunctionHelper { scalar(Least.class, "least"), scalar(Left.class, "left"), scalar(Length.class, "length"), + scalar(Crc32.class, "crc32"), scalar(Like.class, "like"), scalar(Ln.class, "ln"), scalar(Locate.class, "locate"), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32.java new file mode 100644 index 00000000000000..036807062faf2c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Crc32.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.scalar; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; +import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.BigIntType; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.types.VarcharType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * ScalarFunction 'crc32'. This class is generated by GenerateFunction. + */ +public class Crc32 extends ScalarFunction + implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(BigIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT), + FunctionSignature.ret(BigIntType.INSTANCE).args(StringType.INSTANCE) + ); + + /** + * constructor with 1 argument. + */ + public Crc32(Expression arg) { + super("crc32", arg); + } + + /** + * withChildren. + */ + @Override + public Crc32 withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new Crc32(children.get(0)); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCrc32(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java index a2562baa7588b8..79b8452e1df61c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java @@ -129,6 +129,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Cosh; import org.apache.doris.nereids.trees.expressions.functions.scalar.CosineDistance; import org.apache.doris.nereids.trees.expressions.functions.scalar.CountEqual; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Crc32; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateMap; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateNamedStruct; import org.apache.doris.nereids.trees.expressions.functions.scalar.CreateStruct; @@ -1450,6 +1451,10 @@ default R visitLength(Length length, C context) { return visitScalarFunction(length, context); } + default R visitCrc32(Crc32 crc32, C context) { + return visitScalarFunction(crc32, context); + } + default R visitLike(Like like, C context) { return visitStringRegexPredicate(like, context); } diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 4b5e113faee614..a05f6ac8abbbbb 100644 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1574,6 +1574,7 @@ [['rpad'], 'VARCHAR', ['VARCHAR', 'INT', 'VARCHAR'], 'ALWAYS_NULLABLE'], [['append_trailing_char_if_absent'], 'VARCHAR', ['VARCHAR', 'VARCHAR'], 'ALWAYS_NULLABLE'], [['length'], 'INT', ['VARCHAR'], ''], + [['crc32'], 'BIGINT', ['VARCHAR'], ''], [['bit_length'], 'INT', ['VARCHAR'], ''], [['char_length', 'character_length'], 'INT', ['VARCHAR'], ''], @@ -1639,6 +1640,7 @@ [['rpad'], 'STRING', ['STRING', 'INT', 'STRING'], 'ALWAYS_NULLABLE'], [['append_trailing_char_if_absent'], 'STRING', ['STRING', 'STRING'], 'ALWAYS_NULLABLE'], [['length'], 'INT', ['STRING'], ''], + [['crc32'], 'BIGINT', ['STRING'], ''], [['bit_length'], 'INT', ['STRING'], ''], [['char_length', 'character_length'], 'INT', ['STRING'], ''], diff --git a/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_like.out b/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_like.out index 51fbfc68af1adc..9fcfc2d6ee9854 100644 --- a/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_like.out +++ b/regression-test/data/query_p0/sql_functions/string_functions/test_string_function_like.out @@ -245,3 +245,15 @@ bb -- !sql -- +-- !crc32_1 -- +348606243 + +-- !crc32_2 -- +130583814 + +-- !crc32_3 -- +2707236321 + +-- !crc32_4 -- +\N + diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_like.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_like.groovy index 9c5deed8651f95..e092526b03cfa3 100644 --- a/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_like.groovy +++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_string_function_like.groovy @@ -90,4 +90,8 @@ suite("test_string_function_like") { qt_sql "SELECT k FROM ${tbName} WHERE NOT LIKE(k, \"%\") ORDER BY k;" // sql "DROP TABLE ${tbName};" + qt_crc32_1 "select crc32(\"DORIS\");" + qt_crc32_2 "select crc32(\"APACHE DORIS\");" + qt_crc32_3 "select crc32(10);" + qt_crc32_4 "select crc32(NULL);" } From cd1183751bf716b3760728e59239123d32e185b4 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Oct 2024 11:21:27 +0800 Subject: [PATCH 2/3] [local exchange](fix) Fix correctness caused by local exchange (#41555) For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR. --- .../pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 11 +++-- .../partitioned_hash_join_probe_operator.h | 2 +- .../partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.h | 6 +++ be/src/pipeline/exec/union_source_operator.h | 5 +++ be/src/pipeline/pipeline_x/operator.h | 4 -- .../pipeline_x_fragment_context.cpp | 43 ++++++++++--------- 13 files changed, 49 insertions(+), 36 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index de1f26057ff185..e082d803bcb238 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index eb65414206c5b4..0269ba15be04da 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 97b704078c63ec..b2c1c414314c25 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 60530521ec0a82..0c29374e690960 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 72f47b576f6e15..e32176a08d1555 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -627,8 +625,6 @@ class DataSinkOperatorXBase : public OperatorBase { : DataDistribution(ExchangeType::NOOP); } - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ac527ed8e69888..a3dff107f1bd3a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -771,7 +771,7 @@ Status PipelineXFragmentContext::_create_tree_helper( ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe, - int child_idx, const bool followed_by_shuffled_join) { + int child_idx, const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { // TODO: print thrift msg @@ -782,11 +782,11 @@ Status PipelineXFragmentContext::_create_tree_helper( const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -797,7 +797,7 @@ Status PipelineXFragmentContext::_create_tree_helper( } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -811,8 +811,8 @@ Status PipelineXFragmentContext::_create_tree_helper( cur_pipe->operator_xs().empty() ? cur_pipe->sink_x()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; cur_pipe->_name.push_back('-'); @@ -823,7 +823,7 @@ Status PipelineXFragmentContext::_create_tree_helper( for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -865,13 +865,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = - operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() - : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator() + : cur_pipe->sink_x()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1047,7 +1047,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1121,7 +1121,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && @@ -1152,7 +1152,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1203,8 +1203,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1225,8 +1225,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1256,6 +1256,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1298,7 +1299,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1338,7 +1339,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1349,11 +1350,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { From 0e1dd7b266515434321457e817995b1caf80e8cc Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Oct 2024 17:21:56 +0800 Subject: [PATCH 3/3] [fix](schema scan) Finish schema scanner if limitation is reached (#41592) --- be/src/pipeline/exec/schema_scan_operator.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index d5353655ab070a..73e54d52be25ca 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl } while (block->rows() == 0 && !*eos); local_state.reached_limit(block, eos); + if (*eos) { + local_state._finish_dependency->set_always_ready(); + } return Status::OK(); }