Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true, state->get_query_ctx());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true, state->get_query_ctx());
_broadcast_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true);
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
}

std::vector<Dependency*> dependencies() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx());
"SHUFFLE_DATA_DEPENDENCY");
queues[i]->set_dependency(deps[i]);
metrics[i] = _runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
TUnit ::TIME_NS, timer_name, 1);
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
RuntimeState* state)
: JoinBuildSinkLocalState(parent, state) {
_finish_dependency = std::make_shared<CountedFinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY");
}

Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ namespace doris::pipeline {
PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY", true);
}
Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
_scan_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY", state->get_query_ctx());
_scan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY");
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1);
SCOPED_TIMER(exec_time_counter());
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
namespace doris::pipeline {
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {
_finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY",
true, state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_SPILL_DEPENDENCY", true);
}

Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
->data_queue.set_source_dependency(_shared_state->source_deps.front());
} else {
_only_const_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY",
state->get_query_ctx());
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY");
_dependency = _only_const_dependency.get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
Expand Down
36 changes: 11 additions & 25 deletions be/src/pipeline/pipeline_x/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@
namespace doris::pipeline {

Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id,
std::string name, QueryContext* ctx) {
source_deps.push_back(
std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY", ctx));
std::string name) {
source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY"));
source_deps.back()->set_shared_state(this);
return source_deps.back().get();
}

Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx) {
sink_deps.push_back(
std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true, ctx));
Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name) {
sink_deps.push_back(std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true));
sink_deps.back()->set_shared_state(this);
return sink_deps.back().get();
}
Expand Down Expand Up @@ -72,15 +69,6 @@ void Dependency::set_ready() {
}

Dependency* Dependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
if (!ready && task) {
_add_block_task(task);
}
return ready ? nullptr : this;
}

Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load();
if (!ready && task) {
Expand All @@ -91,20 +79,18 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {

std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}",
fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, ready={}, _always_ready={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _is_cancelled());
_ready, _always_ready);
return fmt::to_string(debug_string_buffer);
}

std::string CountedFinishDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(
debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, is cancelled={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), _ready,
_always_ready, _is_cancelled(), _counter);
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _always_ready={}, count={}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _always_ready, _counter);
return fmt::to_string(debug_string_buffer);
}

Expand All @@ -117,7 +103,7 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {

Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
auto ready = _ready.load();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
Expand Down
46 changes: 15 additions & 31 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,26 @@ struct BasicSharedState {

virtual ~BasicSharedState() = default;

Dependency* create_source_dependency(int operator_id, int node_id, std::string name,
QueryContext* ctx);
Dependency* create_source_dependency(int operator_id, int node_id, std::string name);

Dependency* create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx);
Dependency* create_sink_dependency(int dest_id, int node_id, std::string name);
};

class Dependency : public std::enable_shared_from_this<Dependency> {
public:
ENABLE_FACTORY_CREATOR(Dependency);
Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
Dependency(int id, int node_id, std::string name)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(false),
_ready(false),
_query_ctx(query_ctx) {}
Dependency(int id, int node_id, std::string name, bool ready, QueryContext* query_ctx)
_ready(false) {}
Dependency(int id, int node_id, std::string name, bool ready)
: _id(id),
_node_id(node_id),
_name(std::move(name)),
_is_write_dependency(true),
_ready(ready),
_query_ctx(query_ctx) {}
_ready(ready) {}
virtual ~Dependency() = default;

bool is_write_dependency() const { return _is_write_dependency; }
Expand Down Expand Up @@ -167,14 +163,12 @@ class Dependency : public std::enable_shared_from_this<Dependency> {

protected:
void _add_block_task(PipelineXTask* task);
bool _is_cancelled() const { return _query_ctx->is_cancelled(); }

const int _id;
const int _node_id;
const std::string _name;
const bool _is_write_dependency;
std::atomic<bool> _ready;
const QueryContext* _query_ctx = nullptr;

BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
Expand All @@ -200,20 +194,11 @@ struct FakeDependency final : public Dependency {
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { return nullptr; }
};

struct FinishDependency : public Dependency {
public:
using SharedState = FakeSharedState;
FinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: Dependency(id, node_id, name, true, query_ctx) {}

[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override;
};

struct CountedFinishDependency final : public Dependency {
public:
using SharedState = FakeSharedState;
CountedFinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: Dependency(id, node_id, name, true, query_ctx) {}
CountedFinishDependency(int id, int node_id, std::string name)
: Dependency(id, node_id, name, true) {}

void add() {
std::unique_lock<std::mutex> l(_mtx);
Expand Down Expand Up @@ -309,9 +294,8 @@ struct RuntimeFilterTimerQueue {

class RuntimeFilterDependency final : public Dependency {
public:
RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx,
IRuntimeFilter* runtime_filter)
: Dependency(id, node_id, name, query_ctx), _runtime_filter(runtime_filter) {}
RuntimeFilterDependency(int id, int node_id, std::string name, IRuntimeFilter* runtime_filter)
: Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;

Dependency* is_blocked_by(PipelineXTask* task) override;
Expand Down Expand Up @@ -621,8 +605,8 @@ class AsyncWriterDependency final : public Dependency {
public:
using SharedState = BasicSharedState;
ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AsyncWriterDependency", true, query_ctx) {}
AsyncWriterDependency(int id, int node_id)
: Dependency(id, node_id, "AsyncWriterDependency", true) {}
~AsyncWriterDependency() override = default;
};

Expand Down Expand Up @@ -771,10 +755,10 @@ struct LocalExchangeSharedState : public BasicSharedState {
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) {
void create_source_dependencies(int operator_id, int node_id) {
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(
operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
}
};
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,7 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_shared_state = info.shared_state->template cast<SharedStateArg>();

_dependency = _shared_state->create_source_dependency(
_parent->operator_id(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
_parent->operator_id(), _parent->node_id(), _parent->get_name());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
Expand Down Expand Up @@ -482,8 +481,7 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
} else {
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
_parent->dests_id().front(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
_parent->dests_id().front(), _parent->node_id(), _parent->get_name());
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
Expand Down Expand Up @@ -563,8 +561,8 @@ template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency = AsyncWriterDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id());
_writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get());

_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,9 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
using Base = PipelineXSinkLocalState<FakeSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY", true);
}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
std::to_string((int)data_distribution.distribution_type));
}
auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
_runtime_state->get_query_ctx());
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
Expand All @@ -877,8 +876,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);

shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(),
_query_ctx.get());
shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id());

// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
Expand Down
20 changes: 15 additions & 5 deletions be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,22 @@ class PipelineXTask : public PipelineTask {
int task_id() const { return _index; };

void clear_blocking_state() {
// Another thread may call finalize to release all dependencies
// And then it will core.
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && _blocked_dep) {
_blocked_dep->set_ready();
_blocked_dep = nullptr;
if (!_finished) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _read_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _write_dependencies) {
dep->set_always_ready();
}
for (auto* dep : _finish_dependencies) {
dep->set_always_ready();
}
}
}

Expand Down
Loading