Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "exchange_sink_operator.h"

#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>

Expand Down Expand Up @@ -190,6 +191,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode);
_tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
_tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false));
_tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor =
Expand All @@ -206,7 +211,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
.txn_id = _txn_id,
.pool = p._pool.get(),
.location = _location,
.vec_output_expr_ctxs = &_fake_expr_ctxs,
.vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
.schema = _schema,
.caller = (void*)this,
.create_partition_callback = &ExchangeSinkLocalState::empty_callback_function});
Expand Down Expand Up @@ -297,6 +302,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_location(sink.tablet_sink_location),
_tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
Expand All @@ -309,13 +315,20 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
_output_tuple_id = sink.output_tuple_id;
}
}

Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tsink));
if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
}
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
_tablet_sink_expr_ctxs));
}
return Status::OK();
}

Expand All @@ -324,6 +337,18 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) {
_state = state;
_mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state));
}
return Status::OK();
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

// for shuffle data by partition and tablet
int64_t _txn_id = -1;
vectorized::VExprContextSPtrs _fake_expr_ctxs;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
Expand Down Expand Up @@ -239,6 +239,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
const std::vector<TExpr> _texprs;

const RowDescriptor& _row_desc;
TTupleId _output_tuple_id = -1;

TPartitionType::type _part_type;

Expand All @@ -265,6 +266,8 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
const TTupleId _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;

// for external table sink random partition
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context);
exchangeNode.updateTupleIds(tupleDescriptor);
exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
dataStreamSink.setExchNodeId(exchangeNode.getId());
dataStreamSink.setOutputPartition(dataPartition);
parentFragment.addChild(inputFragment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
.createLocation(database.getId(), olapTableSink.getDstTable());
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink {
protected TOlapTableLocationParam tabletSinkLocationParam = null;
protected TupleDescriptor tabletSinkTupleDesc = null;
protected long tabletSinkTxnId = -1;
protected List<Expr> tabletSinkExprs = null;

public DataStreamSink() {

Expand Down Expand Up @@ -145,6 +146,10 @@ public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) {
this.tabletSinkLocationParam = locationParam;
}

public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
this.tabletSinkExprs = tabletSinkExprs;
}

public void setTabletSinkTxnId(long txnId) {
this.tabletSinkTxnId = txnId;
}
Expand Down Expand Up @@ -224,6 +229,11 @@ protected TDataSink toThrift() {
if (tabletSinkLocationParam != null) {
tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
}
if (tabletSinkExprs != null) {
for (Expr expr : tabletSinkExprs) {
tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
}
}
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct TDataStreamSink {
10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
13: optional list<Exprs.TExpr> tablet_sink_exprs
}

struct TMultiCastDataStreamSink {
Expand Down
3 changes: 3 additions & 0 deletions regression-test/data/nereids_p0/insert_into_table/random.out
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@
13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634
13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634

-- !sql_select --
1 11 11

11 changes: 11 additions & 0 deletions regression-test/suites/nereids_p0/insert_into_table/random.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,15 @@ suite('nereids_insert_random') {
sql 'set delete_without_partition=true'
sql '''delete from dup_t_type_cast_rd where id is not null'''
sql '''delete from dup_t_type_cast_rd where id is null'''

sql 'set enable_strict_consistency_dml=true'
sql 'drop table if exists tbl_1'
sql 'drop table if exists tbl_4'
sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");"""
sql """INSERT INTO tbl_1 VALUES (1, 11);"""
sql 'sync'
sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """
sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;"""
sql 'sync'
qt_sql_select """ select * from tbl_4; """;
}