Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_broadcast_dependency) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time());
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,17 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
: PipelineXSinkLocalState<>(parent, state),
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {}
_serializer(this) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* dependency() override { return _exchange_sink_dependency.get(); }
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
Expand Down Expand Up @@ -236,6 +241,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;

std::shared_ptr<Dependency> _finish_dependency;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
vectorized::RuntimeFilterConsumer(
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final

friend class MultiCastDataStreamerSourceOperatorX;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }

private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

class MultiCastDataStreamerSourceOperatorX final
Expand Down
14 changes: 12 additions & 2 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {

template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
: ScanLocalStateBase(state, parent) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}

template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
Expand Down Expand Up @@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");

return Status::OK();
}

Expand Down Expand Up @@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
}
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());

COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;

RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
};

template <typename LocalStateType>
Expand Down Expand Up @@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {

Dependency* dependency() override { return _scan_dependency.get(); }

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get(); }

protected:
template <typename LocalStateType>
friend class ScanOperatorX;
Expand Down Expand Up @@ -405,6 +410,10 @@ class ScanLocalState : public ScanLocalStateBase {
std::atomic<bool> _eos = false;

std::mutex _block_lock;

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;

std::shared_ptr<Dependency> _finish_dependency;
};

template <typename LocalStateType>
Expand Down
20 changes: 3 additions & 17 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,34 +308,21 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt

PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent,
RuntimeState* state)
: _parent(parent),
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx())) {}
: _parent(parent), _state(state) {}

PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx())) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}
_state(state) {}

template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
_dependency = (DependencyType*)info.dependency.get();
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.upstream_dependencies;
Expand Down Expand Up @@ -386,7 +373,6 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
Expand Down Expand Up @@ -446,7 +432,6 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
Expand Down Expand Up @@ -536,6 +521,7 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
if (_writer->need_normal_close()) {
Expand Down
23 changes: 15 additions & 8 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ class PipelineXLocalStateBase {

virtual Dependency* dependency() { return nullptr; }

Dependency* finishdependency() { return _finish_dependency.get(); }
RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); }
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
// override in Scan MultiCastSink
virtual RuntimeFilterDependency* filterdependency() { return nullptr; }

protected:
friend class OperatorXBase;
Expand All @@ -121,7 +123,6 @@ class PipelineXLocalStateBase {
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
Expand All @@ -135,8 +136,6 @@ class PipelineXLocalStateBase {
vectorized::VExprContextSPtrs _projections;
bool _closed = false;
vectorized::Block _origin_block;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

class OperatorXBase : public OperatorBase {
Expand Down Expand Up @@ -397,7 +396,8 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
virtual Dependency* dependency() { return nullptr; }

Dependency* finishdependency() { return _finish_dependency.get(); }
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }

protected:
DataSinkOperatorXBase* _parent = nullptr;
Expand All @@ -424,7 +424,6 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
std::shared_ptr<Dependency> _finish_dependency;
};

class DataSinkOperatorXBase : public OperatorBase {
Expand Down Expand Up @@ -659,7 +658,11 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {
public:
using Base = PipelineXSinkLocalState<FakeDependency>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {}
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

Expand All @@ -672,11 +675,15 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {

Status try_close(RuntimeState* state, Status exec_status) override;

Dependency* finishdependency() override { return _finish_dependency.get(); }

protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;

std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;

std::shared_ptr<Dependency> _finish_dependency;
};

} // namespace doris::pipeline
7 changes: 4 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(

auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) {
DCHECK(pipeline_id_to_profile[pip_idx]);
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params,
request.fragment.output_sink));
RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink));
return Status::OK();
};

Expand Down Expand Up @@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
OperatorXPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->init(exchange_type));
if (operator_xs.size() > 0) {
if (!operator_xs.empty()) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
}
operator_xs.insert(operator_xs.begin(), source_op);
Expand Down Expand Up @@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {
Expand Down
31 changes: 18 additions & 13 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
_le_state_map(le_state_map),
_le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
Expand All @@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
pipeline->incr_created_tasks();
}

Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink) {
Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) {
DCHECK(_sink);
DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
DCHECK_EQ(state, _state);

{
// set sink local state
Expand All @@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
get_downstream_dependency(),
_le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}

std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);
auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile();
auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile();
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile, scan_ranges, deps,
_le_state_map, _task_idx, _source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(state, info));
parent_profile = state->get_local_state(op->operator_id())->profile();
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
}

_block = doris::vectorized::Block::create_unique();
Expand All @@ -120,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
auto* fin_dep = local_state->finishdependency();
_finish_dependencies.push_back(fin_dep);
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
}
}
{
auto result = _state->get_sink_local_state_result(_sink->operator_id());
Expand All @@ -132,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_write_dependencies = dep;
auto* fin_dep = local_state->finishdependency();
_finish_dependencies.push_back(fin_dep);
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
}
}
{
auto result = _state->get_local_state_result(_source->operator_id());
Expand Down Expand Up @@ -193,6 +195,7 @@ Status PipelineXTask::_open() {
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
Expand Down Expand Up @@ -377,9 +380,11 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1));
i++;

fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
if (_filter_dependency) {
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
}

fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
Expand Down
Loading