Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(_profile, "MemoryUsageBlocks", TUnit::BYTES, 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
_evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
_compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
return Status::OK();
}

Expand Down Expand Up @@ -288,35 +290,41 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block
}
}

for (size_t i = 0; i < _agg_functions_size;
++i) { //insert _agg_input_columns, execute calculate for its
for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(_insert_range_column(
input_block, local_state._agg_expr_ctxs[i][j],
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
{
SCOPED_TIMER(local_state._compute_agg_data_timer);
for (size_t i = 0; i < _agg_functions_size;
++i) { //insert _agg_input_columns, execute calculate for its
for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(_insert_range_column(
input_block, local_state._agg_expr_ctxs[i][j],
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
}
}
}
//record column idx in block
for (size_t i = 0; i < local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute(
input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
local_state._shared_state->partition_by_column_idxs[i] = result_col_id;
{
SCOPED_TIMER(local_state._compute_partition_by_timer);
for (size_t i = 0; i < local_state._shared_state->partition_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(local_state._shared_state->partition_by_eq_expr_ctxs[i]->execute(
input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
local_state._shared_state->partition_by_column_idxs[i] = result_col_id;
}
}

for (size_t i = 0; i < local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute(
input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
{
SCOPED_TIMER(local_state._compute_order_by_timer);
for (size_t i = 0; i < local_state._shared_state->order_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(local_state._shared_state->order_by_eq_expr_ctxs[i]->execute(
input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
}
}

int64_t block_mem_usage = input_block->allocated_bytes();
COUNTER_UPDATE(local_state._memory_used_counter, block_mem_usage);
COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes());
COUNTER_SET(local_state._peak_memory_usage_counter, local_state._memory_used_counter->value());
COUNTER_UPDATE(local_state._blocks_memory_usage, block_mem_usage);

//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
bool _whether_need_next_partition(BlockRowPos& found_partition_end);

RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::Counter* _blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
RuntimeProfile::Counter* _compute_order_by_timer = nullptr;

std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
};
Expand Down
36 changes: 19 additions & 17 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(_init_timer);
_blocks_memory_usage =
profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1);
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
_evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
_execute_timer = ADD_TIMER(profile(), "ExecuteTime");
_get_next_timer = ADD_TIMER(profile(), "GetNextTime");
_get_result_timer = ADD_TIMER(profile(), "GetResultsTime");
return Status::OK();
}

Expand Down Expand Up @@ -233,12 +236,6 @@ Status AnalyticLocalState::open(RuntimeState* state) {
std::placeholders::_1);
}
}
_executor.insert_result =
std::bind<void>(&AnalyticLocalState::_insert_result_info, this, std::placeholders::_1);
_executor.execute =
std::bind<void>(&AnalyticLocalState::_execute_for_win_func, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);

_create_agg_status();
return Status::OK();
}
Expand Down Expand Up @@ -282,6 +279,7 @@ void AnalyticLocalState::_destroy_agg_status() {

void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start, int64_t frame_end) {
SCOPED_TIMER(_execute_timer);
for (size_t i = 0; i < _agg_functions_size; ++i) {
std::vector<const vectorized::IColumn*> agg_columns;
for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
Expand All @@ -300,6 +298,7 @@ void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t
}

void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
SCOPED_TIMER(_get_result_timer);
int64_t current_block_row_pos =
_shared_state->input_block_first_row_positions[_output_block_index];
int64_t get_result_start = _shared_state->current_row_position - current_block_row_pos;
Expand Down Expand Up @@ -344,6 +343,7 @@ void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
}

Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
SCOPED_TIMER(_get_next_timer);
while (_shared_state->current_row_position < _shared_state->partition_by_end.pos &&
_window_end_position < current_block_rows) {
int64_t range_start, range_end;
Expand All @@ -367,31 +367,33 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
// Make sure range_start <= range_end
range_start = std::min(range_start, range_end);
}
_executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos, range_start,
range_end);
_executor.insert_result(current_block_rows);
_execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos,
range_start, range_end);
_insert_result_info(current_block_rows);
}
return Status::OK();
}

Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) {
SCOPED_TIMER(_get_next_timer);
if (_next_partition) {
_executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos,
_partition_by_start.pos, _shared_state->partition_by_end.pos);
_execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos,
_partition_by_start.pos, _shared_state->partition_by_end.pos);
}
_executor.insert_result(current_block_rows);
_insert_result_info(current_block_rows);
return Status::OK();
}

Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) {
SCOPED_TIMER(_get_next_timer);
while (_shared_state->current_row_position < _shared_state->partition_by_end.pos &&
_window_end_position < current_block_rows) {
if (_shared_state->current_row_position >= _order_by_end.pos) {
_update_order_by_range();
_executor.execute(_partition_by_start.pos, _shared_state->partition_by_end.pos,
_order_by_start.pos, _order_by_end.pos);
_execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos,
_order_by_start.pos, _order_by_end.pos);
}
_executor.insert_result(current_block_rows);
_insert_result_info(current_block_rows);
}
return Status::OK();
}
Expand Down Expand Up @@ -536,7 +538,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
local_state.init_result_columns();
size_t current_block_rows =
local_state._shared_state->input_blocks[local_state._output_block_index].rows();
static_cast<void>(local_state._executor.get_next(current_block_rows));
RETURN_IF_ERROR(local_state._executor.get_next(current_block_rows));
if (local_state._window_end_position == current_block_rows) {
break;
}
Expand Down
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,15 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticSharedState>
std::vector<vectorized::AggFnEvaluator*> _agg_functions;

RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::Counter* _execute_timer = nullptr;
RuntimeProfile::Counter* _get_next_timer = nullptr;
RuntimeProfile::Counter* _get_result_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;

using vectorized_execute = std::function<void(int64_t peer_group_start, int64_t peer_group_end,
int64_t frame_start, int64_t frame_end)>;
using vectorized_get_next = std::function<Status(size_t rows)>;
using vectorized_get_result = std::function<void(int64_t current_block_rows)>;

struct executor {
vectorized_execute execute;
vectorized_get_next get_next;
vectorized_get_result insert_result;
};

executor _executor;
Expand Down
20 changes: 11 additions & 9 deletions be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {

// 3. lookup the cache and find proper slot order
hit_cache = QueryCache::instance()->lookup(_cache_key, _version, &_query_cache_handle);
_runtime_profile->add_info_string("HitCache", hit_cache ? "1" : "0");
_runtime_profile->add_info_string("HitCache", std::to_string(hit_cache));
if (hit_cache && !cache_param.force_refresh_query_cache) {
_hit_cache_results = _query_cache_handle.get_cache_result();
auto hit_cache_slot_orders = _query_cache_handle.get_cache_slot_orders();
Expand Down Expand Up @@ -125,13 +125,16 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b

if (local_state._hit_cache_results == nullptr) {
Defer insert_cache([&] {
if (*eos && local_state._need_insert_cache) {
local_state._runtime_profile->add_info_string("InsertCache", "1");
local_state._global_cache->insert(local_state._cache_key, local_state._version,
local_state._local_cache_blocks,
local_state._slot_orders,
local_state._current_query_cache_bytes);
local_state._local_cache_blocks.clear();
if (*eos) {
local_state._runtime_profile->add_info_string(
"InsertCache", std::to_string(local_state._need_insert_cache));
if (local_state._need_insert_cache) {
local_state._global_cache->insert(local_state._cache_key, local_state._version,
local_state._local_cache_blocks,
local_state._slot_orders,
local_state._current_query_cache_bytes);
local_state._local_cache_blocks.clear();
}
}
});

Expand Down Expand Up @@ -162,7 +165,6 @@ Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
// over the max bytes, pass through the data, no need to do cache
local_state._local_cache_blocks.clear();
local_state._need_insert_cache = false;
local_state._runtime_profile->add_info_string("InsertCache", "0");
} else {
local_state._local_cache_blocks.emplace_back(std::move(output_block));
}
Expand Down
16 changes: 12 additions & 4 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,25 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
Status res = local_state._table_func->get_next(state, block, eos);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
{
SCOPED_TIMER(local_state._table_function_execution_timer);
RETURN_IF_ERROR(local_state._table_func->get_next(state, block, eos));
}
{
SCOPED_TIMER(local_state._filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
}
local_state.reached_limit(block, eos);
return res;
return Status::OK();
}

Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
_table_function_execution_timer = ADD_TIMER(profile(), "TableFunctionExecutionTime");
_filter_timer = ADD_TIMER(profile(), "FilterTime");
auto& p = _parent->cast<DataGenSourceOperatorX>();
_table_func = std::make_shared<VNumbersTVF>(p._tuple_id, p._tuple_desc);
_table_func->set_tuple_desc(p._tuple_desc);
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class DataGenLocalState final : public PipelineXLocalState<> {
private:
friend class DataGenSourceOperatorX;
std::shared_ptr<VDataGenFunctionInf> _table_func;
RuntimeProfile::Counter* _table_function_execution_timer = nullptr;
RuntimeProfile::Counter* _filter_timer = nullptr;
};

class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {

private:
friend class ExchangeSinkLocalState;
void _set_ready_to_finish(bool all_done);

phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
Expand Down
Loading