Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ std::shared_ptr<BroadcastPBlockHolder> BroadcastPBlockHolderQueue::pop() {
namespace pipeline {

ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state)
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
Expand All @@ -93,9 +94,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_
_sender_id(send_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()) {}

ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
_context(state->get_query_ctx()),
_parent(parent) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand Down Expand Up @@ -213,8 +213,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_instance_to_broadcast_package_queue[id];

if (_is_finishing) {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
_turn_off_channel(id);
return Status::OK();
}

Expand Down Expand Up @@ -372,8 +371,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
broadcast_q.pop();
} else {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
_turn_off_channel(id);
}

return Status::OK();
Expand Down Expand Up @@ -403,26 +401,21 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
_turn_off_channel(id);
}
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_context->cancel(Status::Cancelled(err));
_ended(id);
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_turn_off_channel(id, true);
}

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
_turn_off_channel(id, true);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
}
Expand All @@ -432,6 +425,20 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
return _instance_to_receiver_eof[id];
}

void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
auto all_done = _busy_channels.fetch_sub(1) == 1;
_set_ready_to_finish(all_done);
if (cleanup && all_done) {
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
_parent->set_reach_limit();
}
}
}
}

void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ using InstanceLoId = int64_t;

namespace pipeline {
class Dependency;
class ExchangeSinkLocalState;
} // namespace pipeline

namespace vectorized {
Expand Down Expand Up @@ -192,11 +193,11 @@ struct ExchangeRpcContext {
};

// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer : public HasTaskExecutionCtx {
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number,
RuntimeState* state);
~ExchangeSinkBuffer();
RuntimeState* state, ExchangeSinkLocalState* parent);
~ExchangeSinkBuffer() = default;
void register_sink(TUniqueId);

Status add_block(TransmitInfo&& request);
Expand Down Expand Up @@ -265,14 +266,16 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();

std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
std::atomic<bool> _should_stop {false};
std::atomic<bool> _should_stop = false;
ExchangeSinkLocalState* _parent = nullptr;
};

} // namespace pipeline
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state);
_state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
Expand Down Expand Up @@ -624,8 +624,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load());
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
return fmt::to_string(debug_string_buffer);
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
RuntimeProfile::Counter* compress_timer() { return _compress_timer; }
RuntimeProfile::Counter* uncompressed_bytes_counter() { return _uncompressed_bytes_counter; }
[[nodiscard]] bool transfer_large_data_by_brpc() const;
bool is_finished() const override { return _reach_limit.load(); }
void set_reach_limit() { _reach_limit = true; };

[[nodiscard]] int sender_id() const { return _sender_id; }

Expand Down Expand Up @@ -199,6 +201,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class PipelineXSinkLocalStateBase {
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
[[nodiscard]] virtual bool is_finished() const { return false; }

[[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0;

Expand Down Expand Up @@ -445,6 +446,13 @@ class DataSinkOperatorXBase : public OperatorBase {

Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
[[nodiscard]] bool is_finished(RuntimeState* state) const {
auto result = state->get_sink_local_state_result();
if (!result) {
return result.error();
}
return result.value()->is_finished();
}

[[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
return Status::OK();
}

if (_exchanger) {
_exchanger->close(*this);
}
if (_shared_state) {
_shared_state->sub_running_source_operators();
}
Expand All @@ -67,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption());
}
return fmt::to_string(debug_string_buffer);
}

Expand Down
25 changes: 25 additions & 0 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
return Status::OK();
}

void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
while (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false);
block_wrapper->unref(local_state._shared_state);
}
}

Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
Expand Down Expand Up @@ -182,6 +192,14 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
return Status::OK();
}

void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
local_state._shared_state->sub_mem_usage(local_state._channel_id,
next_block.allocated_bytes());
}
}

Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
Expand Down Expand Up @@ -255,6 +273,13 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
return Status::OK();
}

void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
// do nothing
}
}

Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Exchanger {
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
virtual void close(LocalExchangeSourceLocalState& local_state) {}

protected:
friend struct LocalExchangeSharedState;
Expand Down Expand Up @@ -108,6 +109,7 @@ class ShuffleExchanger : public Exchanger {

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
void close(LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; }

protected:
Expand Down Expand Up @@ -150,6 +152,7 @@ class PassthroughExchanger final : public Exchanger {
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
void close(LocalExchangeSourceLocalState& local_state) override;

private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
Expand Down Expand Up @@ -188,6 +191,7 @@ class BroadcastExchanger final : public Exchanger {
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
void close(LocalExchangeSourceLocalState& local_state) override;

private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ class PipelineFragmentContext : public TaskExecutionContext {
}
}

void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
_tasks[j][i]->stop_if_finished();
}
}
};

private:
Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr* root, PipelinePtr cur_pipe);
Expand Down
12 changes: 7 additions & 5 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
Expand Down Expand Up @@ -334,14 +335,15 @@ Status PipelineTask::execute(bool* eos) {
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
return status;
});
// Pull block from operator chain
if (!_dry_run) {
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
_eos = true;
} else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, block, eos));
} else {
*eos = true;
_eos = true;
}

if (_block->rows() != 0 || *eos) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ class PipelineTask {

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }

void stop_if_finished() {
if (_sink->is_finished(_state)) {
clear_blocking_state();
}
}

private:
friend class RuntimeFilterDependency;
bool _is_blocked();
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,9 @@ void FragmentMgr::cancel_worker() {
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
for (auto& pipeline_itr : _pipeline_map) {
pipeline_itr.second->clear_finished_tasks();
}
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
if (auto q_ctx = it->second.lock()) {
if (q_ctx->is_timeout(now) && q_ctx->enable_pipeline_x_exec()) {
Expand Down