Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
35 changes: 21 additions & 14 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if (!only_local_exchange) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_finish_dependency->block();
}

if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
Expand Down Expand Up @@ -298,7 +302,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

_finish_dependency->block();
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
Expand Down Expand Up @@ -592,8 +595,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
final_st = st;
}
}
local_state._sink_buffer->set_should_stop();
return final_st;
if (local_state._sink_buffer) {
local_state._sink_buffer->set_should_stop();
}
}
return final_st;
}
Expand Down Expand Up @@ -679,11 +683,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
if (_sink_buffer) {
fmt::format_to(
debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
}
return fmt::to_string(debug_string_buffer);
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
dep_vec.push_back(_queue_dependency.get());
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
if (_broadcast_dependency) {
dep_vec.push_back(_broadcast_dependency.get());
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
}

Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
return Status::OK();
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
Expand All @@ -148,7 +151,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Status::OK();
return Base::close(state, exec_status);
}

bool HashJoinBuildSinkLocalState::build_unique() const {
Expand Down Expand Up @@ -519,6 +522,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ class OperatorBase {

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
Expand All @@ -268,7 +271,7 @@ class OperatorBase {
OperatorXPtr _child_x = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class PartitionedAggSinkLocalState

std::unique_ptr<RuntimeState> _runtime_state;

bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;

// temp structures during spilling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
} while (block->rows() == 0 && !*eos);

local_state.reached_limit(block, eos);
if (*eos) {
local_state._finish_dependency->set_always_ready();
}
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha

RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;

bool _eos = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
};
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
11 changes: 10 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>

#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"

namespace doris::pipeline {

Expand Down Expand Up @@ -99,4 +100,12 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
return Status::OK();
}

} // namespace doris::pipeline
void Pipeline::make_all_runnable() {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
}
}
}

} // namespace doris::pipeline
17 changes: 16 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::weak_ptr<PipelineFragmentContext> context)
: _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) {
_init_profile();
_tasks.resize(_num_tasks, nullptr);
}

void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
Expand Down Expand Up @@ -155,14 +156,24 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }

void incr_created_tasks() { _num_tasks_created++; }
void incr_created_tasks(int i, PipelineTask* task) {
_num_tasks_created++;
_num_tasks_running++;
DCHECK_LT(i, _tasks.size());
_tasks[i] = task;
}

void make_all_runnable();

void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
_tasks.resize(_num_tasks, nullptr);
for (auto& op : operatorXs) {
op->set_parallel_tasks(_num_tasks);
}
}
int num_tasks() const { return _num_tasks; }
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }

std::string debug_string() {
fmt::memory_buffer debug_string_buffer;
Expand Down Expand Up @@ -243,6 +254,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
int _num_tasks = 1;
// How many tasks are already created?
std::atomic<int> _num_tasks_created = 0;
// How many tasks are already created and not finished?
std::atomic<int> _num_tasks_running = 0;
// Tasks in this pipeline.
std::vector<PipelineTask*> _tasks;
};

} // namespace doris::pipeline
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void PipelineFragmentContext::close_if_prepare_failed(Status /*st*/) {
DCHECK(!task->is_pending_finish());
WARN_IF_ERROR(task->close(Status::OK()),
fmt::format("Query {} closed since prepare failed", print_id(_query_id)));
close_a_pipeline();
close_a_pipeline(task->pipeline_id());
}
}

Expand Down Expand Up @@ -960,7 +960,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}

void PipelineFragmentContext::close_a_pipeline() {
void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
std::lock_guard<std::mutex> l(_task_mutex);
g_pipeline_tasks_count << -1;
++_closed_tasks;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void close_a_pipeline();
virtual void close_a_pipeline(PipelineId pipeline_id);

virtual void clear_finished_tasks() {}

Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ class PipelineTask {

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }

PipelineId pipeline_id() const { return _pipeline->id(); }

virtual void clear_blocking_state(bool wake_up_by_downstream = false) {}

protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
Expand Down
Loading