Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,13 +543,6 @@ Status AggLocalState::close(RuntimeState* state) {
_agg_data->method_variant);
}

_shared_state->agg_data = nullptr;
_shared_state->aggregate_data_container = nullptr;
_shared_state->agg_arena_pool = nullptr;
_shared_state->agg_profile_arena = nullptr;

std::vector<vectorized::AggregateDataPtr> tmp_values;
_shared_state->values.swap(tmp_values);
return Base::close(state);
}

Expand Down
18 changes: 4 additions & 14 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,6 @@ Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
return Status::OK();
}

void AnalyticLocalState::release_mem() {
_agg_arena_pool = nullptr;

std::vector<vectorized::Block> tmp_input_blocks;
_shared_state->input_blocks.swap(tmp_input_blocks);

std::vector<std::vector<vectorized::MutableColumnPtr>> tmp_agg_input_columns;
_shared_state->agg_input_columns.swap(tmp_agg_input_columns);

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
}

AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: OperatorX<AnalyticLocalState>(pool, tnode, descs),
Expand Down Expand Up @@ -443,7 +430,10 @@ Status AnalyticLocalState::close(RuntimeState* state) {
}

static_cast<void>(_destroy_agg_status());
release_mem();
_agg_arena_pool = nullptr;

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
return PipelineXLocalState<AnalyticDependency>::close(state);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticDependency>

Status output_current_block(vectorized::Block* block);

void release_mem();

bool init_next_partition(vectorized::BlockRowPos found_partition_end);

private:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(_shared_state->build_blocks->empty() && !p._have_other_join_conjunct &&
!p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}

Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
Expand Down
156 changes: 109 additions & 47 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) {
}},
*_process_hashtable_ctx_variants);
}
_shared_state->arena = nullptr;
_shared_state->hash_table_variants.reset();
_process_hashtable_ctx_variants = nullptr;
_null_map_column = nullptr;
_tuple_is_null_left_flag_column = nullptr;
Expand Down Expand Up @@ -190,46 +188,96 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}
if (local_state._shared_state->_has_null_in_build_side &&
_short_circuit_for_null_in_build_side) {
_short_circuit_for_null_in_build_side && _is_mark_join) {
/// `_has_null_in_build_side` means have null value in build side.
/// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join).
/// We need to create a column as mark with all rows set to NULL.
if (_is_mark_join) {
auto block_rows = local_state._probe_block.rows();
if (block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
auto block_rows = local_state._probe_block.rows();
if (block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}
}
auto mark_column = vectorized::ColumnNullable::create(
vectorized::ColumnUInt8::create(block_rows, 0),
vectorized::ColumnUInt8::create(block_rows, 1));
temp_block.insert({std::move(mark_column),
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});

{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &temp_block, temp_block.columns()));
vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}
}
auto mark_column =
vectorized::ColumnNullable::create(vectorized::ColumnUInt8::create(block_rows, 0),
vectorized::ColumnUInt8::create(block_rows, 1));
temp_block.insert({std::move(mark_column),
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(
_child_x->row_desc().num_materialized_slots());
local_state.reached_limit(output_block, source_state);
{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &temp_block, temp_block.columns()));
}

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
local_state.reached_limit(output_block, source_state);
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (local_state._shared_state->empty_right_table_need_probe_dispose) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}

//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) {
auto type = remove_nullable(_right_table_data_types[i]);
auto column = type->create_column();
column->resize(block_rows);
auto null_map_column =
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
auto nullable_column = vectorized::ColumnNullable::create(std::move(column),
std::move(null_map_column));
temp_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_left_flag_column.get())
->get_data()
.resize_fill(block_rows, 0);
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_right_flag_column.get())
->get_data()
.resize_fill(block_rows, 1);
}

/// No need to check the block size in `_filter_data_and_build_output` because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state,
&temp_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
return Status::OK();
}

local_state._join_block.clear_column_data();

vectorized::MutableBlock mutable_join_block(&local_state._join_block);
Expand Down Expand Up @@ -298,33 +346,45 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
if (!st) {
return st;
}
if (_is_outer_join) {
local_state.add_tuple_is_null_column(&temp_block);
}
auto output_rows = temp_block.rows();
DCHECK(output_rows <= state->batch_size());
{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, &temp_block,
temp_block.columns()));
}

RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state,
&temp_block));
// Here make _join_block release the columns' ptr
local_state._join_block.set_columns(local_state._join_block.clone_empty_columns());
mutable_join_block.clear();
return Status::OK();
}

Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state,
vectorized::Block* output_block,
SourceState& source_state,
vectorized::Block* temp_block,
bool check_rows_count) {
auto& p = _parent->cast<HashJoinProbeOperatorX>();
if (p._is_outer_join) {
add_tuple_is_null_column(temp_block);
}
auto output_rows = temp_block->rows();
if (check_rows_count) {
DCHECK(output_rows <= state->batch_size());
}
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, temp_block,
temp_block->columns()));
}

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
local_state._reset_tuple_is_null_column();
local_state.reached_limit(output_block, source_state);
RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, source_state);
return Status::OK();
}

bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const {
auto& local_state = state->get_local_state(id())->cast<HashJoinProbeLocalState>();
return (local_state._probe_block.rows() == 0 ||
local_state._probe_index == local_state._probe_block.rows()) &&
!local_state._probe_eos &&
(!local_state._shared_state->short_circuit_for_probe || _is_mark_join);
!local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe;
}

Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block,
Expand Down Expand Up @@ -465,6 +525,8 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
_right_table_data_types =
vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc());
_right_table_column_names =
vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc());
_build_side_child.reset();
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class HashJoinProbeLocalState final
void prepare_for_next();
void add_tuple_is_null_column(vectorized::Block* block) override;
void init_for_probe(RuntimeState* state);
Status filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state, vectorized::Block* temp_block,
bool check_rows_count = true);

HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; }

Expand Down Expand Up @@ -135,6 +138,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
std::vector<std::string> _right_table_column_names;
};

} // namespace pipeline
Expand Down
6 changes: 0 additions & 6 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) {
}
_child_block->clear();

vectorized::Blocks tmp_build_blocks;
_shared_state->build_blocks.swap(tmp_build_blocks);

vectorized::MutableColumns tmp_build_side_visited_flags;
_shared_state->build_side_visited_flags.swap(tmp_build_side_visited_flags);

_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
return JoinProbeLocalState<NestedLoopJoinDependency, NestedLoopJoinProbeLocalState>::close(
Expand Down
15 changes: 7 additions & 8 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}

Status PartitionSortSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
_shared_state->previous_row = nullptr;
_shared_state->partition_sorts.clear();
return PipelineXLocalState<PartitionSortDependency>::close(state);
SCOPED_TIMER(_open_timer);
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}

Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block,
Expand Down
23 changes: 7 additions & 16 deletions be/src/pipeline/exec/partition_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,17 @@ class PartitionSortSourceLocalState final : public PipelineXLocalState<Partition
using Base = PipelineXLocalState<PartitionSortDependency>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortDependency>(state, parent),
_get_next_timer(nullptr) {}
_get_sorted_timer(nullptr),
_get_next_timer(nullptr),
_num_rows_returned(0) {}

Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}

Status close(RuntimeState* state) override;

int64_t _num_rows_returned = 0;
Status init(RuntimeState* state, LocalStateInfo& info) override;

private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer = nullptr;
RuntimeProfile::Counter* _get_next_timer = nullptr;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer;
int64_t _num_rows_returned;
};

class PartitionSortSourceOperatorX final : public OperatorX<PartitionSortSourceLocalState> {
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,4 @@ Dependency* SortSourceOperatorX::wait_for_dependency(RuntimeState* state) {
return local_state._dependency->read_blocked_by();
}

Status SortLocalState::close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
_shared_state->sorter = nullptr;
return PipelineXLocalState<SortDependency>::close(state);
}

} // namespace doris::pipeline
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ class SortSourceOperator final : public SourceOperator<SortSourceOperatorBuilder

class SortSourceOperatorX;
class SortLocalState final : public PipelineXLocalState<SortDependency> {
ENABLE_FACTORY_CREATOR(SortLocalState);

public:
ENABLE_FACTORY_CREATOR(SortLocalState);
SortLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status close(RuntimeState* state) override;

private:
friend class SortSourceOperatorX;
Expand Down
Loading