Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
vectorized::RuntimeFilterConsumer(
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final

friend class MultiCastDataStreamerSourceOperatorX;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }

private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

class MultiCastDataStreamerSourceOperatorX final
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ std::string ScanOperator::debug_string() const {

template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
: ScanLocalStateBase(state, parent) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}

template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ class ScanLocalState : public ScanLocalStateBase {

Dependency* dependency() override { return _scan_dependency.get(); }

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };

protected:
template <typename LocalStateType>
friend class ScanOperatorX;
Expand Down Expand Up @@ -405,6 +407,8 @@ class ScanLocalState : public ScanLocalStateBase {
std::atomic<bool> _eos = false;

std::mutex _block_lock;

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

template <typename LocalStateType>
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx())) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}
state->get_query_ctx())) {}

template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalStateInfo& info) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class PipelineXLocalStateBase {
virtual Dependency* dependency() { return nullptr; }

Dependency* finishdependency() { return _finish_dependency.get(); }
RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); }
// override in Scan MultiCastSink
virtual RuntimeFilterDependency* filterdependency() { return nullptr; }

protected:
friend class OperatorXBase;
Expand Down Expand Up @@ -136,7 +137,6 @@ class PipelineXLocalStateBase {
bool _closed = false;
vectorized::Block _origin_block;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

class OperatorXBase : public OperatorBase {
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(

auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) {
DCHECK(pipeline_id_to_profile[pip_idx]);
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params,
request.fragment.output_sink));
RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink));
return Status::OK();
};

Expand Down Expand Up @@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
OperatorXPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->init(exchange_type));
if (operator_xs.size() > 0) {
if (!operator_xs.empty()) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
}
operator_xs.insert(operator_xs.begin(), source_op);
Expand Down Expand Up @@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {
Expand Down
23 changes: 12 additions & 11 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
_le_state_map(le_state_map),
_le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
Expand All @@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
pipeline->incr_created_tasks();
}

Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink) {
Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) {
DCHECK(_sink);
DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
DCHECK_EQ(state, _state);

{
// set sink local state
Expand All @@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
get_downstream_dependency(),
_le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}

std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);
auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile();
auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile();
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile, scan_ranges, deps,
_le_state_map, _task_idx, _source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(state, info));
parent_profile = state->get_local_state(op->operator_id())->profile();
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
}

_block = doris::vectorized::Block::create_unique();
Expand Down Expand Up @@ -193,6 +191,7 @@ Status PipelineXTask::_open() {
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
Expand Down Expand Up @@ -377,9 +376,11 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1));
i++;

fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
if (_filter_dependency) {
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
}

fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class PipelineXTask : public PipelineTask {
return Status::InternalError("Should not reach here!");
}

Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink);
Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink);

Status execute(bool* eos) override;

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.runtime_state != nullptr);

if (req.query_statistics) {
// use to report 'insert into select'
TQueryStatistics queryStatistics;
DCHECK(req.query_statistics->collect_dml_statistics());
req.query_statistics->to_thrift(&queryStatistics);
Expand Down