From 5c91126a04f42b7070e2c7bc1c4d3d7daa61e4c0 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Wed, 18 Sep 2024 10:40:52 +0800 Subject: [PATCH 1/3] [Bug](exchange) fix tablet sink shuffle without project not match the output tuple (#40299) ``` INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1; the tbl_1 have k1,k2 columns the tbl_4 have k1,k2,v columns if without project expr, will be only two columns not match the output tuple. ``` the co-auther of FE code from @starocean999 --- .../pipeline/exec/exchange_sink_operator.cpp | 31 +++++++++++++++++-- be/src/pipeline/exec/exchange_sink_operator.h | 5 ++- .../translator/PhysicalPlanTranslator.java | 3 +- .../commands/insert/OlapInsertExecutor.java | 1 + .../apache/doris/planner/DataStreamSink.java | 10 ++++++ gensrc/thrift/DataSinks.thrift | 1 + .../nereids_p0/insert_into_table/random.out | 3 ++ .../insert_into_table/random.groovy | 11 +++++++ 8 files changed, 59 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e4150b4f7ac68e..cc809da69d9df1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -18,6 +18,7 @@ #include "exchange_sink_operator.h" #include +#include #include #include @@ -249,6 +250,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::make_unique(_vpartition.get(), find_tablet_mode); _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column // on exchange node rather than on TabletWriter _block_convertor = @@ -265,7 +270,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { .txn_id = _txn_id, .pool = p._pool.get(), .location = _location, - .vec_output_expr_ctxs = &_fake_expr_ctxs, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, .schema = _schema, .caller = (void*)this, .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); @@ -355,7 +360,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_partition(sink.tablet_sink_partition), _tablet_sink_location(sink.tablet_sink_location), _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), - _tablet_sink_txn_id(sink.tablet_sink_txn_id) { + _tablet_sink_txn_id(sink.tablet_sink_txn_id), + _t_tablet_sink_exprs(&sink.tablet_sink_exprs) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -367,6 +373,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared(); + if (sink.__isset.output_tuple_id) { + _output_tuple_id = sink.output_tuple_id; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -374,6 +383,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, + _tablet_sink_expr_ctxs)); + } return Status::OK(); } @@ -386,6 +399,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { Status ExchangeSinkOperatorX::open(RuntimeState* state) { DCHECK(state != nullptr); _compression_type = state->fragement_transmission_compression_type(); + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + if (_output_tuple_id == -1) { + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); + } else { + auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); + } + RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); + } return Status::OK(); } @@ -535,7 +560,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._send_new_partition_batch()); } RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, - channel2rows, convert_block.get(), eos)); + channel2rows, block, eos)); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { { SCOPED_TIMER(local_state._split_block_hash_compute_timer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 5a7b8bf4201c27..a94392b906d259 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -217,7 +217,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for shuffle data by partition and tablet int64_t _txn_id = -1; - vectorized::VExprContextSPtrs _fake_expr_ctxs; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; std::unique_ptr _vpartition = nullptr; std::unique_ptr _tablet_finder = nullptr; std::shared_ptr _schema = nullptr; @@ -273,6 +273,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _texprs; const RowDescriptor& _row_desc; + TTupleId _output_tuple_id = -1; TPartitionType::type _part_type; @@ -299,6 +300,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _pool; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; + const std::vector* _t_tablet_sink_exprs = nullptr; // for external table sink random partition // Control the number of channels according to the flow, thereby controlling the number of table sink writers. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a3d3a1885f3f8d..c0d1aeb917f959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -353,8 +353,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute d MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); - exchangeNode.updateTupleIds(tupleDescriptor); + exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc()); dataStreamSink.setExchNodeId(exchangeNode.getId()); dataStreamSink.setOutputPartition(dataPartition); parentFragment.addChild(inputFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 8fc6d6cbd14f3c..f522a9568994d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -174,6 +174,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys .createLocation(database.getId(), olapTableSink.getDstTable()); dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs()); } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index b9cf516bc3d2cc..ef42190fa25004 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink { protected TOlapTableLocationParam tabletSinkLocationParam = null; protected TupleDescriptor tabletSinkTupleDesc = null; protected long tabletSinkTxnId = -1; + protected List tabletSinkExprs = null; public DataStreamSink() { @@ -145,6 +146,10 @@ public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) { this.tabletSinkLocationParam = locationParam; } + public void setTabletSinkExprs(List tabletSinkExprs) { + this.tabletSinkExprs = tabletSinkExprs; + } + public void setTabletSinkTxnId(long txnId) { this.tabletSinkTxnId = txnId; } @@ -224,6 +229,11 @@ protected TDataSink toThrift() { if (tabletSinkLocationParam != null) { tStreamSink.setTabletSinkLocation(tabletSinkLocationParam); } + if (tabletSinkExprs != null) { + for (Expr expr : tabletSinkExprs) { + tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); + } + } tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 6610de4b688a8b..dfdbbcc0a9f418 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -188,6 +188,7 @@ struct TDataStreamSink { 10: optional Descriptors.TOlapTableLocationParam tablet_sink_location 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id + 13: optional list tablet_sink_exprs } struct TMultiCastDataStreamSink { diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out b/regression-test/data/nereids_p0/insert_into_table/random.out index d42426a991f801..dd5bdc8e1d9bb0 100644 --- a/regression-test/data/nereids_p0/insert_into_table/random.out +++ b/regression-test/data/nereids_p0/insert_into_table/random.out @@ -135,3 +135,6 @@ 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 +-- !sql_select -- +1 11 11 + diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy b/regression-test/suites/nereids_p0/insert_into_table/random.groovy index 6cc5cb2b991514..f820ca89bd2de0 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy @@ -43,4 +43,15 @@ suite('nereids_insert_random') { sql 'set delete_without_partition=true' sql '''delete from dup_t_type_cast_rd where id is not null''' sql '''delete from dup_t_type_cast_rd where id is null''' + + sql 'set enable_strict_consistency_dml=true' + sql 'drop table if exists tbl_1' + sql 'drop table if exists tbl_4' + sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");""" + sql """INSERT INTO tbl_1 VALUES (1, 11);""" + sql 'sync' + sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """ + sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;""" + sql 'sync' + qt_sql_select """ select * from tbl_4; """; } From bc0d1e5c0905bae7fdb12c6f4c58d6d1796adfb3 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Thu, 26 Sep 2024 11:04:44 +0800 Subject: [PATCH 2/3] [Bug](tablet-shuffle) tablet shuffle sink data should use input block rather than convert_block (#41293) ## Proposed changes the convert_block is empty firstly, after execute exprs will be filled. so convert_block maybe different with block. so when send data we still need use block. --- .../pipeline/exec/exchange_sink_operator.cpp | 2 + .../nereids_p0/insert_into_table/random.out | 6 ++ .../insert_into_table/random.groovy | 76 +++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index cc809da69d9df1..800f81b839bd1a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -559,6 +559,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._row_distribution._deal_batched = true; RETURN_IF_ERROR(local_state._send_new_partition_batch()); } + // the convert_block maybe different with block after execute exprs + // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, channel2rows, block, eos)); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out b/regression-test/data/nereids_p0/insert_into_table/random.out index dd5bdc8e1d9bb0..c774e023267bc2 100644 --- a/regression-test/data/nereids_p0/insert_into_table/random.out +++ b/regression-test/data/nereids_p0/insert_into_table/random.out @@ -138,3 +138,9 @@ -- !sql_select -- 1 11 11 +-- !sql_select2 -- +1 + +-- !sql_select3 -- +601022201389484209 2024-04-09T20:58:49 卖卖 {"is_poi_first_order":0} + diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy b/regression-test/suites/nereids_p0/insert_into_table/random.groovy index f820ca89bd2de0..9edd855a9a8e56 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy @@ -54,4 +54,80 @@ suite('nereids_insert_random') { sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;""" sql 'sync' qt_sql_select """ select * from tbl_4; """; + + + sql 'drop table if exists tbl_5' + sql 'drop table if exists tbl_6' + sql 'drop table if exists tbl_7' + + sql """ + CREATE TABLE `tbl_5` ( + `orderId` varchar(96) NOT NULL, + `updated_at` datetime NOT NULL, + `userLabel` varchar(255) NULL, + `userTag` variant NULL + ) ENGINE=OLAP + duplicate KEY(`orderId`, `updated_at`) + DISTRIBUTED BY HASH(`orderId`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE TABLE tbl_6 + ( + order_id VARCHAR(96) NOT NULL, + updated_at DATETIMEV2 NOT NULL + ) ENGINE=OLAP + duplicate KEY(`order_id`) + DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ INSERT INTO `tbl_6` values('601022201389484209', '2024-04-09 20:58:49');""" + + sql """ + CREATE TABLE tbl_7 + ( + orderId VARCHAR(96) NOT NULL, + userLabel VARIANT NULL + )ENGINE=OLAP + UNIQUE KEY(`orderId`) + DISTRIBUTED BY HASH(orderId) BUCKETS AUTO + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """INSERT INTO `tbl_7` values('601022201389484209','{\"is_poi_first_order\":0}');""" + + sql 'sync' + qt_sql_select2 """ INSERT INTO + tbl_5 + SELECT + A.order_id as orderId, + A.updated_at, + CASE + WHEN LOCATE('下单1次', CAST(B.userLabel AS STRING)) > 0 + OR LOCATE('买买', CAST(B.userLabel AS STRING)) > 0 then '买买' + when B.userLabel ["is_poi_first_order"] = 1 then '买买' + else '卖卖' + end as userLabel, + B.userLabel AS `userTag` + FROM + ( + select + order_id,updated_at + from + tbl_6 + ) AS A + LEFT JOIN ( + select + orderId,userLabel + from + tbl_7 + ) AS B ON A.order_id = B.orderId; """; + qt_sql_select3 """ select * from tbl_5; """; + } From 2bd700d38a70b70d43a1c6695baacf81aed63a2b Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Sat, 12 Oct 2024 11:40:28 +0800 Subject: [PATCH 3/3] update --- be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 800f81b839bd1a..7584c0b0e4591c 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -401,8 +401,8 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { if (_output_tuple_id == -1) { - RETURN_IF_ERROR( - vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, + _child_x->row_desc())); } else { auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));