diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 2d79b0d8b2b503..85c87df8f4dbc5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -58,6 +58,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); _distribute_rows_into_channels_timer = ADD_TIMER(_profile, "DistributeRowsIntoChannelsTime"); + _send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, @@ -276,23 +277,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status ExchangeSinkLocalState::_send_new_partition_batch() { - if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time - RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); - vectorized::Block tmp_block = - _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - auto& p = _parent->cast(); - // these order is unique. - // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. - // 2. deal batched block - // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. - _row_distribution.clear_batching_stats(); - RETURN_IF_ERROR(p.sink(_state, &tmp_block, false)); - // Recovery back - _row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns()); - _row_distribution._batching_block->clear_column_data(); - _row_distribution._deal_batched = false; - } +Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) { + RETURN_IF_ERROR(_row_distribution.automatic_create_partition()); + auto& p = _parent->cast(); + // Recovery back + _row_distribution.clear_batching_stats(); + _row_distribution._batching_block->clear_column_data(); + _row_distribution._deal_batched = false; + RETURN_IF_ERROR(p.sink(_state, input_block, false)); return Status::OK(); } @@ -512,7 +504,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block old_channel_mem_usage += channel->mem_usage(); } // check out of limit - RETURN_IF_ERROR(local_state._send_new_partition_batch()); std::shared_ptr convert_block = std::make_shared(); const auto& num_channels = local_state._partition_count; std::vector> channel2rows; @@ -527,21 +518,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( *block, convert_block, filtered_rows, has_filtered_rows, local_state._row_part_tablet_ids, local_state._number_input_rows)); - - const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; - const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; - for (int idx = 0; idx < row_ids.size(); ++idx) { - const auto& row = row_ids[idx]; - const auto& tablet_id_hash = - HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); - channel2rows[tablet_id_hash % num_channels].emplace_back(row); + if (local_state._row_distribution.batching_rows() > 0) { + SCOPED_TIMER(local_state._send_new_partition_timer); + RETURN_IF_ERROR(local_state._send_new_partition_batch(block)); + } else { + const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids; + const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids; + for (int idx = 0; idx < row_ids.size(); ++idx) { + const auto& row = row_ids[idx]; + const auto& tablet_id_hash = + HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0); + channel2rows[tablet_id_hash % num_channels].emplace_back(row); + } } } - if (eos) { - local_state._row_distribution._deal_batched = true; - RETURN_IF_ERROR(local_state._send_new_partition_batch()); - } { SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); // the convert_block maybe different with block after execute exprs diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index f0cabb1ffde25c..bee34ad1a854ed 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,7 +96,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } - Status _send_new_partition_batch(); + Status _send_new_partition_batch(vectorized::Block* input_block); std::vector> channels; int current_channel_idx {0}; // index of current channel to send to if _random == true bool only_local_exchange {false}; @@ -127,6 +127,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // Used to counter send bytes under local data exchange RuntimeProfile::Counter* _local_bytes_send_counter = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; + RuntimeProfile::Counter* _send_new_partition_timer = nullptr; RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 248982c02026dc..9e4cce6b528e17 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -131,6 +131,7 @@ class VRowDistribution { std::vector& row_part_tablet_ids, int64_t& rows_stat_val); bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; } + size_t batching_rows() const { return _batching_rows; } // create partitions when need for auto-partition table using #_partitions_need_create. Status automatic_create_partition(); void clear_batching_stats();