diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 366b3c682f7dd5..59d2d5f0551eb1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -18,6 +18,7 @@ #include "exchange_sink_operator.h" #include +#include #include #include @@ -190,6 +191,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::make_unique(_vpartition.get(), find_tablet_mode); _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column // on exchange node rather than on TabletWriter _block_convertor = @@ -206,7 +211,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { .txn_id = _txn_id, .pool = p._pool.get(), .location = _location, - .vec_output_expr_ctxs = &_fake_expr_ctxs, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, .schema = _schema, .caller = (void*)this, .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); @@ -297,6 +302,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_location(sink.tablet_sink_location), _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), _tablet_sink_txn_id(sink.tablet_sink_txn_id), + _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || @@ -309,6 +315,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared(); + if (sink.__isset.output_tuple_id) { + _output_tuple_id = sink.output_tuple_id; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -316,6 +325,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, + _tablet_sink_expr_ctxs)); + } return Status::OK(); } @@ -324,6 +337,18 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { _state = state; _mem_tracker = std::make_unique("ExchangeSinkOperatorX:"); _compression_type = state->fragement_transmission_compression_type(); + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + if (_output_tuple_id == -1) { + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); + } else { + auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); + } + RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index dc07773d5ccecb..300e2a5172f3d1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -183,7 +183,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for shuffle data by partition and tablet int64_t _txn_id = -1; - vectorized::VExprContextSPtrs _fake_expr_ctxs; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; std::unique_ptr _vpartition = nullptr; std::unique_ptr _tablet_finder = nullptr; std::shared_ptr _schema = nullptr; @@ -239,6 +239,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _texprs; const RowDescriptor& _row_desc; + TTupleId _output_tuple_id = -1; TPartitionType::type _part_type; @@ -265,6 +266,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _pool; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; + const std::vector* _t_tablet_sink_exprs = nullptr; // for external table sink random partition // Control the number of channels according to the flow, thereby controlling the number of table sink writers. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 1a56c0fa16076c..81264a5d6acd51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -361,8 +361,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); - exchangeNode.updateTupleIds(tupleDescriptor); + exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc()); dataStreamSink.setExchNodeId(exchangeNode.getId()); dataStreamSink.setOutputPartition(dataPartition); parentFragment.addChild(inputFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index b57ac3834958d6..e38ee40bc9a7a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -173,6 +173,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys .createLocation(database.getId(), olapTableSink.getDstTable()); dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs()); } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index b9cf516bc3d2cc..ef42190fa25004 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink { protected TOlapTableLocationParam tabletSinkLocationParam = null; protected TupleDescriptor tabletSinkTupleDesc = null; protected long tabletSinkTxnId = -1; + protected List tabletSinkExprs = null; public DataStreamSink() { @@ -145,6 +146,10 @@ public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) { this.tabletSinkLocationParam = locationParam; } + public void setTabletSinkExprs(List tabletSinkExprs) { + this.tabletSinkExprs = tabletSinkExprs; + } + public void setTabletSinkTxnId(long txnId) { this.tabletSinkTxnId = txnId; } @@ -224,6 +229,11 @@ protected TDataSink toThrift() { if (tabletSinkLocationParam != null) { tStreamSink.setTabletSinkLocation(tabletSinkLocationParam); } + if (tabletSinkExprs != null) { + for (Expr expr : tabletSinkExprs) { + tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); + } + } tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index e46f7e6067cfef..ed7ccee69cd9a1 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -188,6 +188,7 @@ struct TDataStreamSink { 10: optional Descriptors.TOlapTableLocationParam tablet_sink_location 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id + 13: optional list tablet_sink_exprs } struct TMultiCastDataStreamSink { diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out b/regression-test/data/nereids_p0/insert_into_table/random.out index d42426a991f801..dd5bdc8e1d9bb0 100644 --- a/regression-test/data/nereids_p0/insert_into_table/random.out +++ b/regression-test/data/nereids_p0/insert_into_table/random.out @@ -135,3 +135,6 @@ 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 +-- !sql_select -- +1 11 11 + diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy b/regression-test/suites/nereids_p0/insert_into_table/random.groovy index 6cc5cb2b991514..f820ca89bd2de0 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy @@ -43,4 +43,15 @@ suite('nereids_insert_random') { sql 'set delete_without_partition=true' sql '''delete from dup_t_type_cast_rd where id is not null''' sql '''delete from dup_t_type_cast_rd where id is null''' + + sql 'set enable_strict_consistency_dml=true' + sql 'drop table if exists tbl_1' + sql 'drop table if exists tbl_4' + sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");""" + sql """INSERT INTO tbl_1 VALUES (1, 11);""" + sql 'sync' + sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """ + sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;""" + sql 'sync' + qt_sql_select """ select * from tbl_4; """; }