Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_distribute_rows_into_channels_timer = ADD_TIMER(_profile, "DistributeRowsIntoChannelsTime");
_send_new_partition_timer = ADD_TIMER(_profile, "SendNewPartitionTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
Expand Down Expand Up @@ -276,23 +277,14 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status ExchangeSinkLocalState::_send_new_partition_batch() {
if (_row_distribution.need_deal_batching()) { // maybe try_close more than 1 time
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
_row_distribution.clear_batching_stats();
RETURN_IF_ERROR(p.sink(_state, &tmp_block, false));
// Recovery back
_row_distribution._batching_block->set_mutable_columns(tmp_block.mutate_columns());
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
}
Status ExchangeSinkLocalState::_send_new_partition_batch(vectorized::Block* input_block) {
RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// Recovery back
_row_distribution.clear_batching_stats();
_row_distribution._batching_block->clear_column_data();
_row_distribution._deal_batched = false;
RETURN_IF_ERROR(p.sink(_state, input_block, false));
return Status::OK();
}

Expand Down Expand Up @@ -512,7 +504,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
old_channel_mem_usage += channel->mem_usage();
}
// check out of limit
RETURN_IF_ERROR(local_state._send_new_partition_batch());
std::shared_ptr<vectorized::Block> convert_block = std::make_shared<vectorized::Block>();
const auto& num_channels = local_state._partition_count;
std::vector<std::vector<uint32>> channel2rows;
Expand All @@ -527,21 +518,21 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
*block, convert_block, filtered_rows, has_filtered_rows,
local_state._row_part_tablet_ids, local_state._number_input_rows));

const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
if (local_state._row_distribution.batching_rows() > 0) {
SCOPED_TIMER(local_state._send_new_partition_timer);
RETURN_IF_ERROR(local_state._send_new_partition_batch(block));
} else {
const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id_hash =
HashUtil::zlib_crc_hash(&tablet_ids[idx], sizeof(int64), 0);
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}
}

if (eos) {
local_state._row_distribution._deal_batched = true;
RETURN_IF_ERROR(local_state._send_new_partition_batch());
}
{
SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
// the convert_block maybe different with block after execute exprs
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
static Status empty_callback_function(void* sender, TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch();
Status _send_new_partition_batch(vectorized::Block* input_block);
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if _random == true
bool only_local_exchange {false};
Expand Down Expand Up @@ -127,6 +127,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// Used to counter send bytes under local data exchange
RuntimeProfile::Counter* _local_bytes_send_counter = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _send_new_partition_timer = nullptr;

RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class VRowDistribution {
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
size_t batching_rows() const { return _batching_rows; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
Expand Down