diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index afe9aeab8fdb84..abde34a1d0255b 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -30,8 +30,10 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - _blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1); - _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); + _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime"); + _compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime"); + _compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime"); + _compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime"); return Status::OK(); } @@ -288,35 +290,41 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block } } - for (size_t i = 0; i < _agg_functions_size; - ++i) { //insert _agg_input_columns, execute calculate for its - for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) { - RETURN_IF_ERROR(_insert_range_column( - input_block, local_state._agg_expr_ctxs[i][j], - local_state._shared_state->agg_input_columns[i][j].get(), block_rows)); + { + SCOPED_TIMER(local_state._compute_agg_data_timer); + for (size_t i = 0; i < _agg_functions_size; + ++i) { //insert _agg_input_columns, execute calculate for its + for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) { + RETURN_IF_ERROR(_insert_range_column( + input_block, local_state._agg_expr_ctxs[i][j], + local_state._shared_state->agg_input_columns[i][j].get(), block_rows)); + } } } - //record column idx in block - for (size_t i = 0; i < local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; - RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute( - input_block, &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->partition_by_column_idxs[i] = result_col_id; + { + SCOPED_TIMER(local_state._compute_partition_by_timer); + for (size_t i = 0; i < local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) { + int result_col_id = -1; + RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute( + input_block, &result_col_id)); + DCHECK_GE(result_col_id, 0); + local_state._shared_state->partition_by_column_idxs[i] = result_col_id; + } } - for (size_t i = 0; i < local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; - RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute( - input_block, &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->ordey_by_column_idxs[i] = result_col_id; + { + SCOPED_TIMER(local_state._compute_order_by_timer); + for (size_t i = 0; i < local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) { + int result_col_id = -1; + RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute( + input_block, &result_col_id)); + DCHECK_GE(result_col_id, 0); + local_state._shared_state->ordey_by_column_idxs[i] = result_col_id; + } } - int64_t block_mem_usage = input_block->allocated_bytes(); - COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage); + COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value()); - COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage); //TODO: if need improvement, the is a tips to maintain a free queue, //so the memory could reuse, no need to new/delete again; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index b35354107f671f..e04b220ee351e7 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -58,7 +58,9 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState _agg_expr_ctxs; }; diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 019f95042c2e4a..7dafbaa0330683 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -162,7 +162,10 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_init_timer); _blocks_memory_usage = profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1); - _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); + _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime"); + _execute_timer = ADD_TIMER(profile(), "ExecuteTime"); + _get_next_timer = ADD_TIMER(profile(), "GetNextTime"); + _get_result_timer = ADD_TIMER(profile(), "GetResultsTime"); return Status::OK(); } @@ -233,12 +236,6 @@ Status AnalyticLocalState::open(RuntimeState* state) { std::placeholders::_1); } } - _executor.insert_result = - std::bind(&AnalyticLocalState::_insert_result_info, this, std::placeholders::_1); - _executor.execute = - std::bind(&AnalyticLocalState::_execute_for_win_func, this, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); - _create_agg_status(); return Status::OK(); } @@ -282,6 +279,7 @@ void AnalyticLocalState::_destroy_agg_status() { void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, int64_t frame_end) { + SCOPED_TIMER(_execute_timer); for (size_t i = 0; i < _agg_functions_size; ++i) { std::vector agg_columns; for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) { @@ -300,6 +298,7 @@ void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t } void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { + SCOPED_TIMER(_get_result_timer); int64_t current_block_row_pos = _shared_state->input_block_first_row_positions[_output_block_index]; int64_t get_result_start = _shared_state->current_row_position - current_block_row_pos; @@ -344,6 +343,7 @@ void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { } Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { + SCOPED_TIMER(_get_next_timer); while (_shared_state->current_row_position < _shared_state->partition_by_end.pos && _window_end_position < current_block_rows) { int64_t range_start, range_end; @@ -367,31 +367,33 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { // Make sure range_start <= range_end range_start = std::min(range_start, range_end); } - _executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos, range_start, - range_end); - _executor.insert_result(current_block_rows); + _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, + range_start, range_end); + _insert_result_info(current_block_rows); } return Status::OK(); } Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) { + SCOPED_TIMER(_get_next_timer); if (_next_partition) { - _executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos, - _partition_by_start.pos, _shared_state->partition_by_end.pos); + _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, + _partition_by_start.pos, _shared_state->partition_by_end.pos); } - _executor.insert_result(current_block_rows); + _insert_result_info(current_block_rows); return Status::OK(); } Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) { + SCOPED_TIMER(_get_next_timer); while (_shared_state->current_row_position < _shared_state->partition_by_end.pos && _window_end_position < current_block_rows) { if (_shared_state->current_row_position >= _order_by_end.pos) { _update_order_by_range(); - _executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos, - _order_by_start.pos, _order_by_end.pos); + _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, + _order_by_start.pos, _order_by_end.pos); } - _executor.insert_result(current_block_rows); + _insert_result_info(current_block_rows); } return Status::OK(); } @@ -536,7 +538,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block local_state.init_result_columns(); size_t current_block_rows = local_state._shared_state->input_blocks[local_state._output_block_index].rows(); - static_cast(local_state._executor.get_next(current_block_rows)); + RETURN_IF_ERROR(local_state._executor.get_next(current_block_rows)); if (local_state._window_end_position == current_block_rows) { break; } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 0080ad5e03c8b0..8f44b77f567e55 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -96,17 +96,15 @@ class AnalyticLocalState final : public PipelineXLocalState std::vector _agg_functions; RuntimeProfile::Counter* _evaluation_timer = nullptr; + RuntimeProfile::Counter* _execute_timer = nullptr; + RuntimeProfile::Counter* _get_next_timer = nullptr; + RuntimeProfile::Counter* _get_result_timer = nullptr; RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; - using vectorized_execute = std::function; using vectorized_get_next = std::function; - using vectorized_get_result = std::function; struct executor { - vectorized_execute execute; vectorized_get_next get_next; - vectorized_get_result insert_result; }; executor _executor; diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index 2e9b21976f841a..cace8465fc2d46 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -65,7 +65,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { // 3. lookup the cache and find proper slot order hit_cache = QueryCache::instance()->lookup(_cache_key, _version, &_query_cache_handle); - _runtime_profile->add_info_string("HitCache", hit_cache ? "1" : "0"); + _runtime_profile->add_info_string("HitCache", std::to_string(hit_cache)); if (hit_cache && !cache_param.force_refresh_query_cache) { _hit_cache_results = _query_cache_handle.get_cache_result(); auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders(); @@ -125,13 +125,16 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b if (local_state._hit_cache_results == nullptr) { Defer insert_cache([&] { - if (*eos && local_state._need_insert_cache) { - local_state._runtime_profile->add_info_string("InsertCache", "1"); - local_state._global_cache->insert(local_state._cache_key, local_state._version, - local_state._local_cache_blocks, - local_state._slot_orders, - local_state._current_query_cache_bytes); - local_state._local_cache_blocks.clear(); + if (*eos) { + local_state._runtime_profile->add_info_string( + "InsertCache", std::to_string(local_state._need_insert_cache)); + if (local_state._need_insert_cache) { + local_state._global_cache->insert(local_state._cache_key, local_state._version, + local_state._local_cache_blocks, + local_state._slot_orders, + local_state._current_query_cache_bytes); + local_state._local_cache_blocks.clear(); + } } }); @@ -162,7 +165,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b // over the max bytes, pass through the data, no need to do cache local_state._local_cache_blocks.clear(); local_state._need_insert_cache = false; - local_state._runtime_profile->add_info_string("InsertCache", "0"); } else { local_state._local_cache_blocks.emplace_back(std::move(output_block)); } diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 965092b7eef20f..d400953799e5bb 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -70,17 +70,25 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - Status res = local_state._table_func->get_next(state, block, eos); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); + { + SCOPED_TIMER(local_state._table_function_execution_timer); + RETURN_IF_ERROR(local_state._table_func->get_next(state, block, eos)); + } + { + SCOPED_TIMER(local_state._filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, + block->columns())); + } local_state.reached_limit(block, eos); - return res; + return Status::OK(); } Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); + _table_function_execution_timer = ADD_TIMER(profile(), "TableFunctionExecutionTime"); + _filter_timer = ADD_TIMER(profile(), "FilterTime"); auto& p = _parent->cast(); _table_func = std::make_shared(p._tuple_id, p._tuple_desc); _table_func->set_tuple_desc(p._tuple_desc); diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index c63ef97bb7a40f..bada5ec4080d08 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -44,6 +44,8 @@ class DataGenLocalState final : public PipelineXLocalState<> { private: friend class DataGenSourceOperatorX; std::shared_ptr _table_func; + RuntimeProfile::Counter* _table_function_execution_timer = nullptr; + RuntimeProfile::Counter* _filter_timer = nullptr; }; class DataGenSourceOperatorX final : public OperatorX { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 2ff7a20086470a..13692532a335a4 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -195,7 +195,6 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { private: friend class ExchangeSinkLocalState; - void _set_ready_to_finish(bool all_done); phmap::flat_hash_map> _instance_to_package_queue_mutex; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 55b0e43c936d5a..1f91af01aa1f6b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -39,11 +39,6 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -Status ExchangeSinkLocalState::serialize_block(vectorized::Block* src, PBlock* dest, - int num_receivers) { - return _parent->cast().serialize_block(*this, src, dest, num_receivers); -} - bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { return _parent->cast()._transfer_large_data_by_brpc; } @@ -61,14 +56,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _local_sent_rows = ADD_COUNTER(_profile, "LocalSentRows", TUnit::UNIT); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); _compress_timer = ADD_TIMER(_profile, "CompressTime"); - _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime"); - _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait"); _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); - _split_block_distribute_by_channel_timer = - ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); + _distribute_rows_into_channels_timer = ADD_TIMER(_profile, "DistributeRowsIntoChannelsTime"); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); - _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, [this]() { @@ -141,7 +132,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::mt19937 g(rd()); shuffle(channels.begin(), channels.end(), g); } - int local_size = 0; + size_t local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { @@ -151,6 +142,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } only_local_exchange = local_size == channels.size(); + _rpc_channels_num = channels.size() - local_size; + PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); @@ -389,7 +382,6 @@ void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); - COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); bool all_receiver_eof = true; for (auto& channel : local_state.channels) { @@ -431,14 +423,15 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block { bool serialized = false; RETURN_IF_ERROR(local_state._serializer.next_serialized_block( - block, block_holder->get_block(), local_state.channels.size(), &serialized, - eos)); + block, block_holder->get_block(), local_state._rpc_channels_num, + &serialized, eos)); if (serialized) { auto cur_block = local_state._serializer.get_block()->to_block(); if (!cur_block.empty()) { + DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0); RETURN_IF_ERROR(local_state._serializer.serialize_block( &cur_block, block_holder->get_block(), - local_state.channels.size())); + local_state._rpc_channels_num)); } else { block_holder->reset_block(); } @@ -504,10 +497,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block old_channel_mem_usage += channel->mem_usage(); } if (_part_type == TPartitionType::HASH_PARTITIONED) { + SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); RETURN_IF_ERROR(channel_add_rows( state, local_state.channels, local_state._partition_count, local_state._partitioner->get_channel_ids().get(), rows, block, eos)); } else { + SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); RETURN_IF_ERROR(channel_add_rows( state, local_state.channels, local_state._partition_count, local_state._partitioner->get_channel_ids().get(), rows, block, eos)); @@ -556,10 +551,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._row_distribution._deal_batched = true; RETURN_IF_ERROR(local_state._send_new_partition_batch()); } - // the convert_block maybe different with block after execute exprs - // when send data we still use block - RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, - channel2rows, block, eos)); + { + SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); + // the convert_block maybe different with block after execute exprs + // when send data we still use block + RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, + channel2rows, block, eos)); + } int64_t new_channel_mem_usage = 0; for (const auto& channel : local_state.channels) { new_channel_mem_usage += channel->mem_usage(); @@ -579,8 +577,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } std::vector> assignments = local_state.scale_writer_partitioning_exchanger->accept(block); - RETURN_IF_ERROR(channel_add_rows_with_idx( - state, local_state.channels, local_state.channels.size(), assignments, block, eos)); + { + SCOPED_TIMER(local_state._distribute_rows_into_channels_timer); + RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, + local_state.channels.size(), assignments, + block, eos)); + } int64_t new_channel_mem_usage = 0; for (const auto& channel : local_state.channels) { @@ -635,24 +637,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block return final_st; } -Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vectorized::Block* src, - PBlock* dest, int num_receivers) { - { - SCOPED_TIMER(state.serialize_batch_timer()); - dest->Clear(); - size_t uncompressed_bytes = 0; - size_t compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, &uncompressed_bytes, - &compressed_bytes, _compression_type, - _transfer_large_data_by_brpc)); - COUNTER_UPDATE(state.bytes_sent_counter(), compressed_bytes * num_receivers); - COUNTER_UPDATE(state.uncompressed_bytes_counter(), uncompressed_bytes * num_receivers); - COUNTER_UPDATE(state.compress_timer(), src->get_compress_time()); - } - - return Status::OK(); -} - void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto& channel : channels) { channel->register_exchange_buffer(buffer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 141693eb820f4a..63d50290005470 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -77,27 +77,13 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } - Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); - RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } - RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; } RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; } RuntimeProfile::Counter* local_bytes_send_counter() { return _local_bytes_send_counter; } RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; } - RuntimeProfile::Counter* brpc_send_timer() { return _brpc_send_timer; } - RuntimeProfile::Counter* serialize_batch_timer() { return _serialize_batch_timer; } - RuntimeProfile::Counter* split_block_distribute_by_channel_timer() { - return _split_block_distribute_by_channel_timer; - } - RuntimeProfile::Counter* bytes_sent_counter() { return _bytes_sent_counter; } - RuntimeProfile::Counter* split_block_hash_compute_timer() { - return _split_block_hash_compute_timer; - } RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } - RuntimeProfile::Counter* compress_timer() { return _compress_timer; } - RuntimeProfile::Counter* uncompressed_bytes_counter() { return _uncompressed_bytes_counter; } [[nodiscard]] bool transfer_large_data_by_brpc() const; bool is_finished() const override { return _reach_limit.load(); } void set_reach_limit() { _reach_limit = true; }; @@ -129,16 +115,13 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { std::unique_ptr _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; - RuntimeProfile::Counter* _brpc_send_timer = nullptr; - RuntimeProfile::Counter* _brpc_wait_timer = nullptr; RuntimeProfile::Counter* _bytes_sent_counter = nullptr; RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr; RuntimeProfile::Counter* _local_sent_rows = nullptr; RuntimeProfile::Counter* _local_send_timer = nullptr; RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr; - RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr; + RuntimeProfile::Counter* _distribute_rows_into_channels_timer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; - RuntimeProfile::Counter* _rows_sent_counter = nullptr; // Throughput per total time spent in sender RuntimeProfile::Counter* _overall_throughput = nullptr; // Used to counter send bytes under local data exchange @@ -153,6 +136,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { int _sender_id; std::shared_ptr _broadcast_pb_mem_limiter; + size_t _rpc_channels_num = 0; vectorized::BlockSerializer _serializer; std::shared_ptr _queue_dependency = nullptr; @@ -221,8 +205,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorXcreate_merger( local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, state->batch_size(), _limit, _offset)); local_state.is_ready = true; return Status::OK(); } - auto status = local_state.stream_recvr->get_next(block, eos); - RETURN_IF_ERROR(doris::vectorized::VExprContext::filter_block(local_state.conjuncts(), block, - block->columns())); + { + SCOPED_TIMER(local_state.get_data_from_recvr_timer); + RETURN_IF_ERROR(local_state.stream_recvr->get_next(block, eos)); + } + { + SCOPED_TIMER(local_state.filter_timer); + RETURN_IF_ERROR(doris::vectorized::VExprContext::filter_block(local_state.conjuncts(), + block, block->columns())); + } // In vsortrunmerger, it will set eos=true, and block not empty // so that eos==true, could not make sure that block not have valid data if (!*eos || block->rows() > 0) { @@ -176,7 +187,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block local_state.set_num_rows_returned(_limit); } } - return status; + return Status::OK(); } Status ExchangeLocalState::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index c8ef674d269853..f938f5007d1643 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -59,6 +59,9 @@ class ExchangeLocalState final : public PipelineXLocalState<> { std::vector> deps; std::vector metrics; + RuntimeProfile::Counter* get_data_from_recvr_timer = nullptr; + RuntimeProfile::Counter* filter_timer = nullptr; + RuntimeProfile::Counter* create_merger_timer = nullptr; }; class ExchangeSourceOperatorX final : public OperatorX { diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 93026427b86d56..7c9c38ece5c4e9 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -85,12 +85,6 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i SCOPED_TIMER(_init_timer); _sender_id = info.sender_id; - _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait"); - _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); - _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime"); - _split_block_distribute_by_channel_timer = - ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime"); - _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime"); auto& p = _parent->cast(); CHECK(p._file_opts.get() != nullptr); // create sender diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 7268efe4de4065..e9f2b8eeb9c670 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -40,26 +40,12 @@ class ResultFileSinkLocalState final [[nodiscard]] int sender_id() const { return _sender_id; } - RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } - RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; } - RuntimeProfile::Counter* brpc_send_timer() { return _brpc_send_timer; } - RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } - RuntimeProfile::Counter* split_block_distribute_by_channel_timer() { - return _split_block_distribute_by_channel_timer; - } - private: friend class ResultFileSinkOperatorX; std::shared_ptr _sender; std::shared_ptr _block_holder; - RuntimeProfile::Counter* _brpc_wait_timer = nullptr; - RuntimeProfile::Counter* _local_send_timer = nullptr; - RuntimeProfile::Counter* _brpc_send_timer = nullptr; - RuntimeProfile::Counter* _merge_block_timer = nullptr; - RuntimeProfile::Counter* _split_block_distribute_by_channel_timer = nullptr; - int _sender_id; }; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index f04ace2e292595..99c20e3c2e6442 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -46,7 +46,6 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) auto fragment_instance_id = state->fragment_instance_id(); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); - _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); if (state->query_options().enable_parallel_result_sink) { _sender = _parent->cast()._sender; @@ -146,7 +145,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); COUNTER_UPDATE(local_state.blocks_sent_counter(), 1); if (_fetch_option.use_two_phase_fetch && block->rows() > 0) { RETURN_IF_ERROR(_second_phase_fetch_data(state, block)); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 3c503096ecb51e..ec5d480c38d178 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -129,7 +129,6 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState _sender = nullptr; std::shared_ptr _writer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; - RuntimeProfile::Counter* _rows_sent_counter = nullptr; }; class ResultSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ac820bcab2929a..8460a62883dad9 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -238,14 +238,13 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t } { + SCOPED_TIMER(_parent->merge_block_timer()); if (rows) { if (!rows->empty()) { - SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); const auto* begin = rows->data(); RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + rows->size())); } } else if (!block->empty()) { - SCOPED_TIMER(_parent->merge_block_timer()); RETURN_IF_ERROR(_mutable_block->merge(*block)); } } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 88bb804fd8004f..024d87ab32f49c 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -86,6 +86,7 @@ class BlockSerializer { void reset_block() { _mutable_block.reset(); } void set_is_local(bool is_local) { _is_local = is_local; } + bool is_local() const { return _is_local; } private: pipeline::ExchangeSinkLocalState* _parent;