From f110be6cd43d2db8ef8ffe4b1b6250499e342459 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 7 Apr 2024 17:41:00 +0800 Subject: [PATCH 1/8] [pipelineX](runtime filter) Fix task timeout cuased by runtime filter --- be/src/exprs/runtime_filter.cpp | 6 +- be/src/exprs/runtime_filter.h | 3 +- .../exec/multi_cast_data_stream_source.cpp | 27 ++++- .../exec/multi_cast_data_stream_source.h | 23 +++- be/src/pipeline/exec/scan_operator.cpp | 21 ++-- be/src/pipeline/exec/scan_operator.h | 17 ++- be/src/pipeline/pipeline_x/dependency.cpp | 110 ++++++------------ be/src/pipeline/pipeline_x/dependency.h | 74 +++--------- be/src/pipeline/pipeline_x/operator.h | 2 +- .../pipeline_x_fragment_context.cpp | 1 + .../pipeline/pipeline_x/pipeline_x_task.cpp | 41 +++---- be/src/pipeline/pipeline_x/pipeline_x_task.h | 10 +- be/src/runtime/runtime_state.cpp | 8 ++ be/src/runtime/runtime_state.h | 8 ++ be/src/vec/exec/runtime_filter_consumer.cpp | 64 +++++++--- be/src/vec/exec/runtime_filter_consumer.h | 8 +- be/src/vec/exec/scan/vscan_node.cpp | 2 +- 17 files changed, 212 insertions(+), 213 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f680db5adeee01..1c69a357f171fb 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1095,7 +1095,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listget_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } - _profile->add_info_string("Info", _format_status()); + _profile->add_info_string("Info", formatted_state()); // The runtime filter is pushed down, adding filtering information. auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT); auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT); @@ -1236,7 +1236,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) { _wrapper->_ignored_msg = msg; } -std::string IRuntimeFilter::_format_status() const { +std::string IRuntimeFilter::formatted_state() const { return fmt::format( "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, " "HasLocalTarget = {}]", @@ -1411,7 +1411,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { } else { _profile_init = true; parent_profile->add_child(_profile.get(), true, nullptr); - _profile->add_info_string("Info", _format_status()); + _profile->add_info_string("Info", formatted_state()); } } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 620d61ae5648e4..daccc71ff39ba0 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -355,6 +355,7 @@ class IRuntimeFilter { int64_t registration_time() const { return registration_time_; } void set_filter_timer(std::shared_ptr); + std::string formatted_state() const; protected: // serialize _wrapper to protobuf @@ -373,8 +374,6 @@ class IRuntimeFilter { void _set_push_down(bool push_down) { _is_push_down = push_down; } - std::string _format_status() const; - std::string _get_explain_state_string() const { if (_enable_pipeline_exec) { return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 049905c5cc4123..c40af83bd58ad6 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -77,7 +77,7 @@ Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) { if (_t_data_stream_sink.__isset.conjuncts) { RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); } - return _acquire_runtime_filter(); + return _acquire_runtime_filter(false); } bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() { @@ -129,10 +129,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime vectorized::RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), static_cast(parent)->_row_desc(), _conjuncts) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -}; +} Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -145,12 +142,30 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } + _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); - init_runtime_filter_dependency(_filter_dependency.get()); + init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), + p.get_name() + "_FILTER_DEPENDENCY"); return Status::OK(); } +Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(exec_time_counter()); + int64_t rf_time = 0; + for (auto& dep : _filter_dependencies) { + rf_time += dep->watcher_elapse_time(); + } + COUNTER_SET(_wait_for_rf_timer, rf_time); + + return Base::close(state); +} + Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { //auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 3af8c5507bdc8f..43c845758f79c2 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -104,16 +104,29 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState filter_dependencies() override { + if (_filter_dependencies.empty()) { + return {}; + } + std::vector res; + res.resize(_filter_dependencies.size()); + for (size_t i = 0; i < _filter_dependencies.size(); i++) { + res[i] = _filter_dependencies[i].get(); + } + return res; + } private: vectorized::VExprContextSPtrs _output_expr_contexts; - std::shared_ptr _filter_dependency; + std::vector> _filter_dependencies; + + RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; class MultiCastDataStreamerSourceOperatorX final @@ -185,7 +198,9 @@ class MultiCastDataStreamerSourceOperatorX final #ifdef __clang__ #pragma clang diagnostic pop #endif - const RowDescriptor& _row_desc() { return _row_descriptor; } + const RowDescriptor& _row_desc() { + return _row_descriptor; + } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index b3c9be34e53c17..cc6efb5a07932f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const { return; \ } -template -ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalStateBase(state, parent) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -} - template bool ScanLocalState::ready_to_read() { return !_scanner_ctx->empty_in_queue(0); @@ -133,7 +125,8 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) } // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); - init_runtime_filter_dependency(_filter_dependency.get()); + init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), + p.get_name() + "_FILTER_DEPENDENCY"); // 1: running at not pipeline mode will init profile. // 2: the scan node should create scanner at pipeline mode will init profile. @@ -156,7 +149,7 @@ Status ScanLocalState::open(RuntimeState* state) { if (_opened) { return Status::OK(); } - RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_ERROR(_acquire_runtime_filter(true)); RETURN_IF_ERROR(_process_conjuncts()); auto status = _eos ? Status::OK() : _prepare_scanners(); @@ -1407,7 +1400,11 @@ Status ScanLocalState::close(RuntimeState* state) { return Status::OK(); } COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time()); - COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time()); + int64_t rf_time = 0; + for (auto& dep : _filter_dependencies) { + rf_time += dep->watcher_elapse_time(); + } + COUNTER_UPDATE(exec_time_counter(), rf_time); SCOPED_TIMER(_close_timer); SCOPED_TIMER(exec_time_counter()); @@ -1416,7 +1413,7 @@ Status ScanLocalState::close(RuntimeState* state) { } std::list> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_rf_timer, rf_time); return PipelineXLocalState<>::close(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 7cb7b01a9a44d1..2f7b92df4ecbbb 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -134,7 +134,8 @@ class ScanOperatorX; template class ScanLocalState : public ScanLocalStateBase { ENABLE_FACTORY_CREATOR(ScanLocalState); - ScanLocalState(RuntimeState* state, OperatorXBase* parent); + ScanLocalState(RuntimeState* state, OperatorXBase* parent) + : ScanLocalStateBase(state, parent) {} ~ScanLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; @@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; - RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + std::vector filter_dependencies() override { + if (_filter_dependencies.empty()) { + return {}; + } + std::vector res; + res.resize(_filter_dependencies.size()); + for (size_t i = 0; i < _filter_dependencies.size(); i++) { + res[i] = _filter_dependencies[i].get(); + } + return res; + } std::vector dependencies() const override { return {_scan_dependency.get()}; } @@ -364,7 +375,7 @@ class ScanLocalState : public ScanLocalStateBase { std::mutex _block_lock; - std::shared_ptr _filter_dependency; + std::vector> _filter_dependencies; // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr std::list> _scanners; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 016410b54bb589..f6062c7bbf9a3b 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) { return ready ? nullptr : this; } -Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { - if (!_blocked_by_rf) { - return nullptr; - } - std::unique_lock lc(_task_lock); - if (*_blocked_by_rf && !_is_cancelled()) { - if (LIKELY(task)) { - _add_block_task(task); - } - return this; - } - return nullptr; -} - std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, @@ -114,82 +100,58 @@ std::string Dependency::debug_string(int indentation_level) { std::string RuntimeFilterDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}", - std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), - _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false); + fmt::format_to(debug_string_buffer, "{}, runtime filter: {}", + Dependency::debug_string(indentation_level), _runtime_filter->formatted_state()); return fmt::to_string(debug_string_buffer); } -bool RuntimeFilterTimer::has_ready() { - std::unique_lock lc(_lock); - return _is_ready; +Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { + std::unique_lock lc(_task_lock); + auto ready = _ready.load() || _is_cancelled(); + if (!ready && task) { + _add_block_task(task); + task->_blocked_dep = this; + } + return ready ? nullptr : this; } void RuntimeFilterTimer::call_timeout() { - std::unique_lock lc(_lock); - if (_call_ready) { - return; - } - _call_timeout = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } + _parent->set_ready(); } void RuntimeFilterTimer::call_ready() { - std::unique_lock lc(_lock); - if (_call_timeout) { - return; - } - _call_ready = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } - _is_ready = true; -} - -void RuntimeFilterTimer::call_has_ready() { - std::unique_lock lc(_lock); - DCHECK(!_call_timeout); - if (!_call_ready) { - _parent->sub_filters(_filter_id); - } + _parent->set_ready(); } -void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { - const auto filter_id = runtime_filter->filter_id(); - ; - _filters++; - _filter_ready_map[filter_id] = false; - int64_t registration_time = runtime_filter->registration_time(); - int32 wait_time_ms = runtime_filter->wait_time_ms(); - auto filter_timer = std::make_shared( - filter_id, registration_time, wait_time_ms, - std::dynamic_pointer_cast(shared_from_this())); - runtime_filter->set_filter_timer(filter_timer); - ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); -} +void RuntimeFilterTimerQueue::start() { + while (!_stop) { + std::unique_lock lk(cv_m); -void RuntimeFilterDependency::sub_filters(int id) { - std::vector local_block_task {}; - { - std::lock_guard lk(_task_lock); - if (!_filter_ready_map[id]) { - _filter_ready_map[id] = true; - _filters--; + cv.wait(lk, [this] { return !_que.empty() || _stop; }); + if (_stop) { + break; } - if (_filters == 0) { - _watcher.stop(); - { - *_blocked_by_rf = false; - local_block_task.swap(_blocked_task); + { + std::unique_lock lc(_que_lock); + std::list> new_que; + for (auto& it : _que) { + if (it.use_count() == 1) { + // `use_count == 1` means this runtime filter has been released + } else if (it->_parent->is_blocked_by(nullptr)) { + // This means runtime filter is not ready, so we call timeout or continue to poll this timer. + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } + } } + new_que.swap(_que); } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } - for (auto* task : local_block_task) { - task->wake_up(); - } + _shutdown = true; } void LocalExchangeSharedState::sub_running_sink_operators() { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 3ea096a81d6337..10ea60f47f6da3 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -209,75 +209,36 @@ struct FinishDependency final : public Dependency { }; class RuntimeFilterDependency; +struct RuntimeFilterTimerQueue; class RuntimeFilterTimer { public: - RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms, + RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, std::shared_ptr parent) - : _filter_id(filter_id), - _parent(std::move(parent)), + : _parent(std::move(parent)), _registration_time(registration_time), _wait_time_ms(wait_time_ms) {} + // Called by runtime filter producer. void call_ready(); + // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout. void call_timeout(); - void call_has_ready(); - - // When the use count is equal to 1, only the timer queue still holds ownership, - // so there is no need to take any action. - void call_has_release() {}; - - bool has_ready(); - int64_t registration_time() const { return _registration_time; } int32_t wait_time_ms() const { return _wait_time_ms; } private: - int _filter_id = -1; - bool _call_ready {}; - bool _call_timeout {}; - std::shared_ptr _parent; + friend struct RuntimeFilterTimerQueue; + std::shared_ptr _parent = nullptr; std::mutex _lock; const int64_t _registration_time; const int32_t _wait_time_ms; - bool _is_ready = false; }; struct RuntimeFilterTimerQueue { constexpr static int64_t interval = 10; void run() { _thread.detach(); } - void start() { - while (!_stop) { - std::unique_lock lk(cv_m); - - cv.wait(lk, [this] { return !_que.empty() || _stop; }); - if (_stop) { - break; - } - { - std::unique_lock lc(_que_lock); - std::list> new_que; - for (auto& it : _que) { - if (it.use_count() == 1) { - it->call_has_release(); - } else if (it->has_ready()) { - it->call_has_ready(); - } else { - int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); - if (ms_since_registration > it->wait_time_ms()) { - it->call_timeout(); - } else { - new_que.push_back(std::move(it)); - } - } - } - new_que.swap(_que); - } - std::this_thread::sleep_for(std::chrono::milliseconds(interval)); - } - _shutdown = true; - } + void start(); void stop() { _stop = true; @@ -311,21 +272,14 @@ struct RuntimeFilterTimerQueue { class RuntimeFilterDependency final : public Dependency { public: - RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx) - : Dependency(id, node_id, name, query_ctx) {} - Dependency* is_blocked_by(PipelineXTask* task) override; - void add_filters(IRuntimeFilter* runtime_filter); - void sub_filters(int id); - void set_blocked_by_rf(std::shared_ptr blocked_by_rf) { - _blocked_by_rf = blocked_by_rf; - } - + RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx, + IRuntimeFilter* runtime_filter) + : Dependency(id, node_id, name, query_ctx), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; -protected: - std::atomic_int _filters; - phmap::flat_hash_map _filter_ready_map; - std::shared_ptr _blocked_by_rf; + Dependency* is_blocked_by(PipelineXTask* task) override; +private: + const IRuntimeFilter* _runtime_filter = nullptr; }; struct AggSharedState : public BasicSharedState { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 20c038c7469999..45e42390bc5f0c 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -102,7 +102,7 @@ class PipelineXLocalStateBase { // override in Scan virtual Dependency* finishdependency() { return nullptr; } // override in Scan MultiCastSink - virtual RuntimeFilterDependency* filterdependency() { return nullptr; } + virtual std::vector filter_dependencies() { return {}; } std::shared_ptr get_query_statistics_ptr() { return _query_statistics; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index f888500cef65ec..37945758c9e0bb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -599,6 +599,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( i); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); + task_runtime_state->set_pipeline_x_task(_tasks[i].back().get()); } } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 914170ec1efd3d..53db264b6c0acf 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -139,7 +139,11 @@ Status PipelineXTask::_extract_dependencies() { _finish_dependencies.push_back(fin_dep); } } - { _filter_dependency = _state->get_local_state(_source->operator_id())->filterdependency(); } + { + const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); + std::copy(deps.begin(), deps.end(), + std::inserter(_filter_dependencies, _filter_dependencies.end())); + } return Status::OK(); } @@ -189,26 +193,10 @@ Status PipelineXTask::_open() { _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { auto* local_state = _state->get_local_state(o->operator_id()); - // Here, it needs to loop twice because it's possible that when "open" happens, - // the filter is not ready yet. - // However, during the execution of "is_blocked_by," the filter may become ready, - // so it needs to be "open" again. - for (size_t i = 0; i < 2; i++) { - auto st = local_state->open(_state); - if (st.is()) { - DCHECK(_filter_dependency); - _blocked_dep = _filter_dependency->is_blocked_by(this); - if (_blocked_dep) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); - RETURN_IF_ERROR(st); - } else if (i == 1) { - return Status::InternalError("Unknown RF error, task was blocked by RF twice"); - } - } else { - RETURN_IF_ERROR(st); - break; - } - } + auto st = local_state->open(_state); + DCHECK(st.is() ? !_filter_dependencies.empty() : true) + << debug_string(); + RETURN_IF_ERROR(st); } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); _opened = true; @@ -245,6 +233,7 @@ Status PipelineXTask::execute(bool* eos) { SCOPED_RAW_TIMER(&time_spent); auto st = _open(); if (st.is()) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); return Status::OK(); } RETURN_IF_ERROR(st); @@ -423,7 +412,7 @@ std::string PipelineXTask::debug_string() { if (_finished) { return fmt::to_string(debug_string_buffer); } - fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n"); + size_t i = 0; for (; i < _read_dependencies.size(); i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, @@ -436,10 +425,10 @@ std::string PipelineXTask::debug_string() { _write_dependencies[j]->debug_string(i + 1)); } - if (_filter_dependency) { - fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); - i++; + fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n"); + for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _filter_dependencies[j]->debug_string(i + 1)); } fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 6383fdefa4eaa7..302b8aeff1fc56 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -157,6 +157,7 @@ class PipelineXTask : public PipelineTask { static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); private: + friend class RuntimeFilterDependency; Dependency* _write_blocked_dependency() { for (auto* op_dep : _write_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); @@ -180,6 +181,13 @@ class PipelineXTask : public PipelineTask { } Dependency* _read_blocked_dependency() { + for (auto* op_dep : _filter_dependencies) { + _blocked_dep = op_dep->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return _blocked_dep; + } + } for (auto* op_dep : _read_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { @@ -204,7 +212,7 @@ class PipelineXTask : public PipelineTask { std::vector _read_dependencies; std::vector _write_dependencies; std::vector _finish_dependencies; - RuntimeFilterDependency* _filter_dependency; + std::vector _filter_dependencies; // All shared states of this pipeline task. std::map> _op_shared_states; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1490931ab70215..a091ca6d6d140f 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline_x/operator.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/exec_env.h" #include "runtime/load_path_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" @@ -587,4 +588,11 @@ Status RuntimeState::register_consumer_runtime_filter(const doris::TRuntimeFilte bool RuntimeState::is_nereids() const { return _query_ctx->is_nereids(); } + +pipeline::PipelineXTask* RuntimeState::pipeline_x_task() const { + DCHECK(_pipeline_x_task && _pipeline_x_task->is_running()) + << (_pipeline_x_task ? _pipeline_x_task->debug_string() : "_pipeline_x_task is NULL"); + return _pipeline_x_task; +} + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 66a3acd3041a7c..eddcd98a3d4c8a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -50,6 +50,7 @@ namespace pipeline { class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; class PipelineXFragmentContext; +class PipelineXTask; } // namespace pipeline class DescriptorTbl; @@ -623,6 +624,12 @@ class RuntimeState { int task_id() const { return _task_id; } + pipeline::PipelineXTask* pipeline_x_task() const; + + void set_pipeline_x_task(pipeline::PipelineXTask* pipeline_x_task) { + _pipeline_x_task = pipeline_x_task; + } + private: Status create_error_log_file(); @@ -746,6 +753,7 @@ class RuntimeState { bool _load_zero_tolerance = false; std::vector> _pipeline_id_to_profile; + pipeline::PipelineXTask* _pipeline_x_task = nullptr; // prohibit copies RuntimeState(const RuntimeState&); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 097df8016155c1..f943e131bbf1f1 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -17,6 +17,8 @@ #include "vec/exec/runtime_filter_consumer.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" + namespace doris::vectorized { RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, @@ -75,36 +77,62 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { } void RuntimeFilterConsumer::init_runtime_filter_dependency( - doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) { - _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf); + std::vector>& + runtime_filter_dependencies, + const int id, const int node_id, const std::string& name) { + runtime_filter_dependencies.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - _runtime_filter_dependency->add_filters(runtime_filter); + runtime_filter_dependencies[i] = std::make_shared( + id, node_id, name, _state->get_query_ctx(), runtime_filter); + _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); + auto filter_timer = std::make_shared( + runtime_filter->registration_time(), runtime_filter->wait_time_ms(), + runtime_filter_dependencies[i]); + runtime_filter->set_filter_timer(filter_timer); + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } } -Status RuntimeFilterConsumer::_acquire_runtime_filter() { +Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector vexprs; + Status rf_status = Status::OK(); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - bool ready = runtime_filter->is_ready(); - if (!ready) { - ready = runtime_filter->await(); - } - if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); - _runtime_filter_ctxs[i].apply_mark = true; - } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && - !_runtime_filter_ctxs[i].apply_mark) { - *_blocked_by_rf = true; - } else if (!_runtime_filter_ctxs[i].apply_mark) { - DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); - _is_all_rf_applied = false; + if (pipeline_x) { + DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency) + << _state->pipeline_x_task()->debug_string(); + auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by( + _state->pipeline_x_task()); + + bool ready = rf_dep == nullptr; + if (ready && !_runtime_filter_ctxs[i].apply_mark) { + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); + _runtime_filter_ctxs[i].apply_mark = true; + } else if (!ready && !_runtime_filter_ctxs[i].apply_mark) { + _is_all_rf_applied = false; + return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); + } + } else { + bool ready = runtime_filter->is_ready(); + if (!ready) { + ready = runtime_filter->await(); + } + if (ready && !_runtime_filter_ctxs[i].apply_mark) { + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); + _runtime_filter_ctxs[i].apply_mark = true; + } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && + !_runtime_filter_ctxs[i].apply_mark) { + *_blocked_by_rf = true; + } else if (!_runtime_filter_ctxs[i].apply_mark) { + DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); + _is_all_rf_applied = false; + } } } RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); - if (*_blocked_by_rf) { + if (!pipeline_x && *_blocked_by_rf) { return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 86609624be6f40..61fdf13cd8bb11 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -38,13 +38,16 @@ class RuntimeFilterConsumer { bool runtime_filters_are_ready_or_timeout(); - void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*); + void init_runtime_filter_dependency( + std::vector>& + runtime_filter_dependencies, + const int id, const int node_id, const std::string& name); protected: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(bool need_local_merge); // Get all arrived runtime filters at Open phase. - Status _acquire_runtime_filter(); + Status _acquire_runtime_filter(bool pipeline_x); // Append late-arrival runtime filters to the vconjunct_ctx. Status _append_rf_into_conjuncts(const std::vector& vexprs); @@ -58,6 +61,7 @@ class RuntimeFilterConsumer { // set to true if this runtime filter is already applied to vconjunct_ctx_ptr bool apply_mark = false; IRuntimeFilter* runtime_filter = nullptr; + pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr; }; std::vector _runtime_filter_ctxs; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 372b8e6bca6a7a..744b68c0966210 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -187,7 +187,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) { } _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); RETURN_IF_ERROR(ExecNode::alloc_resource(state)); - RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_ERROR(_acquire_runtime_filter(false)); RETURN_IF_ERROR(_process_conjuncts()); if (_is_pipeline_scan) { From 336c4ac0833722eb6d01df954dc63f6eef7cedaf Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 7 Apr 2024 17:44:29 +0800 Subject: [PATCH 2/8] update --- be/src/pipeline/exec/multi_cast_data_stream_source.h | 4 +--- be/src/pipeline/pipeline_x/dependency.h | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 43c845758f79c2..fd1f6f2c033667 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -198,9 +198,7 @@ class MultiCastDataStreamerSourceOperatorX final #ifdef __clang__ #pragma clang diagnostic pop #endif - const RowDescriptor& _row_desc() { - return _row_descriptor; - } + const RowDescriptor& _row_desc() { return _row_descriptor; } }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 10ea60f47f6da3..65ef6ec29f852d 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -278,6 +278,7 @@ class RuntimeFilterDependency final : public Dependency { std::string debug_string(int indentation_level = 0) override; Dependency* is_blocked_by(PipelineXTask* task) override; + private: const IRuntimeFilter* _runtime_filter = nullptr; }; From f3816a244c545d57a17a94a5aad8200e3aafe617 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 7 Apr 2024 17:48:21 +0800 Subject: [PATCH 3/8] update --- be/src/vec/exec/runtime_filter_consumer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index f943e131bbf1f1..baa58d9a0120fa 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -107,6 +107,9 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { _state->pipeline_x_task()); bool ready = rf_dep == nullptr; + if (!ready) { + runtime_filter->await(); + } if (ready && !_runtime_filter_ctxs[i].apply_mark) { RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); _runtime_filter_ctxs[i].apply_mark = true; From 6d263a4d84c4390eea62d9275feebe0ca9a1d0e5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 7 Apr 2024 19:26:12 +0800 Subject: [PATCH 4/8] update --- be/src/pipeline/pipeline_x/pipeline_x_task.h | 7 ------- 1 file changed, 7 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 302b8aeff1fc56..9a05689690fb12 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -181,13 +181,6 @@ class PipelineXTask : public PipelineTask { } Dependency* _read_blocked_dependency() { - for (auto* op_dep : _filter_dependencies) { - _blocked_dep = op_dep->is_blocked_by(this); - if (_blocked_dep != nullptr) { - _blocked_dep->start_watcher(); - return _blocked_dep; - } - } for (auto* op_dep : _read_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { From 29f6c816a73363222f3ca335cdaa8a78aa2fff2b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 7 Apr 2024 21:05:17 +0800 Subject: [PATCH 5/8] update --- be/src/pipeline/pipeline_x/dependency.cpp | 1 + be/src/pipeline/pipeline_x/dependency.h | 4 ++++ be/src/vec/exec/runtime_filter_consumer.cpp | 10 ++++++++-- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index f6062c7bbf9a3b..ffadf98983ca28 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -116,6 +116,7 @@ Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { } void RuntimeFilterTimer::call_timeout() { + _parent->set_timeout(); _parent->set_ready(); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 65ef6ec29f852d..7060786cb7ff22 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -279,8 +279,12 @@ class RuntimeFilterDependency final : public Dependency { Dependency* is_blocked_by(PipelineXTask* task) override; + void set_timeout() { _is_timeout = true; } + bool timeout() const { return _is_timeout.load(); } + private: const IRuntimeFilter* _runtime_filter = nullptr; + std::atomic_bool _is_timeout = false; }; struct AggSharedState : public BasicSharedState { diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index baa58d9a0120fa..b4d151f8655387 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -106,16 +106,22 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by( _state->pipeline_x_task()); - bool ready = rf_dep == nullptr; + bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout(); + + bool ready = rf_dep == nullptr && !timeout; if (!ready) { runtime_filter->await(); } if (ready && !_runtime_filter_ctxs[i].apply_mark) { + // Runtime filter has been applied in open phase. RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); _runtime_filter_ctxs[i].apply_mark = true; + } else if (rf_dep != nullptr) { + // Runtime filter is neither ready nor timeout, so we should continue to wait RF. + return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); } else if (!ready && !_runtime_filter_ctxs[i].apply_mark) { + // Runtime filter is timeout and not applied. _is_all_rf_applied = false; - return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); } } else { bool ready = runtime_filter->is_ready(); From 226cee4cb165dbf90bc52ef6c3f655e2eb6ebbab Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 8 Apr 2024 11:06:39 +0800 Subject: [PATCH 6/8] udpate --- be/src/exprs/runtime_filter.cpp | 17 ++++++++++++++ be/src/exprs/runtime_filter.h | 1 + be/src/pipeline/pipeline_x/dependency.cpp | 5 ++++- be/src/pipeline/pipeline_x/dependency.h | 3 ++- .../pipeline_x_fragment_context.cpp | 1 - .../pipeline/pipeline_x/pipeline_x_task.cpp | 11 +++++----- be/src/pipeline/pipeline_x/pipeline_x_task.h | 11 ++++++++++ be/src/runtime/runtime_state.cpp | 6 ----- be/src/runtime/runtime_state.h | 7 ------ be/src/vec/exec/runtime_filter_consumer.cpp | 22 ++++--------------- 10 files changed, 44 insertions(+), 40 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1c69a357f171fb..1c5c3f7d4a2907 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() { return true; } +void IRuntimeFilter::update_state() { + DCHECK(is_consumer()); + auto execution_timeout = _state->execution_timeout * 1000; + auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms; + // bitmap filter is precise filter and only filter once, so it must be applied. + int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER + ? execution_timeout + : runtime_filter_wait_time_ms; + auto expected = _rf_state_atomic.load(std::memory_order_acquire); + DCHECK(_enable_pipeline_exec); + // In pipelineX, runtime filters will be ready or timeout before open phase. + if (expected == RuntimeFilterState::NOT_READY) { + DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); + _rf_state_atomic = RuntimeFilterState::TIME_OUT; + } +} + // NOTE: Wait infinitely will not make scan task wait really forever. // Because BlockTaskSchedule will make it run when query is timedout. bool IRuntimeFilter::wait_infinitely() const { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index daccc71ff39ba0..ff825523ae198f 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -263,6 +263,7 @@ class IRuntimeFilter { // This function will wait at most config::runtime_filter_shuffle_wait_time_ms // if return true , filter is ready to use bool await(); + void update_state(); // this function will be called if a runtime filter sent by rpc // it will notify all wait threads void signal(); diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index ffadf98983ca28..d795c67687ccd4 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -128,7 +128,10 @@ void RuntimeFilterTimerQueue::start() { while (!_stop) { std::unique_lock lk(cv_m); - cv.wait(lk, [this] { return !_que.empty() || _stop; }); + while (_que.empty() && !_stop) { + cv.wait_for(lk, std::chrono::seconds(interval), + [this] { return !_que.empty() || _stop; }); + } if (_stop) { break; } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 7060786cb7ff22..7c34c4902924a7 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -243,6 +243,7 @@ struct RuntimeFilterTimerQueue { void stop() { _stop = true; cv.notify_all(); + wait_for_shutdown(); } void wait_for_shutdown() const { @@ -251,7 +252,7 @@ struct RuntimeFilterTimerQueue { } } - ~RuntimeFilterTimerQueue() { wait_for_shutdown(); } + ~RuntimeFilterTimerQueue() = default; RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } void push_filter_timer(std::shared_ptr filter) { push(filter); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 37945758c9e0bb..f888500cef65ec 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -599,7 +599,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( i); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); - task_runtime_state->set_pipeline_x_task(_tasks[i].back().get()); } } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 53db264b6c0acf..ccdf66b4a7d35a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -227,16 +227,15 @@ Status PipelineXTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } + if (_runtime_filter_blocked_dependency() != nullptr) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); + return Status::OK(); + } // The status must be runnable if (!_opened) { { SCOPED_RAW_TIMER(&time_spent); - auto st = _open(); - if (st.is()) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); - return Status::OK(); - } - RETURN_IF_ERROR(st); + RETURN_IF_ERROR(_open()); } if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 9a05689690fb12..1f3dd9c3b71f5f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -191,6 +191,17 @@ class PipelineXTask : public PipelineTask { return nullptr; } + Dependency* _runtime_filter_blocked_dependency() { + for (auto* op_dep : _filter_dependencies) { + _blocked_dep = op_dep->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return _blocked_dep; + } + } + return nullptr; + } + Status _extract_dependencies(); void set_close_pipeline_time() override {} void _init_profile() override; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index a091ca6d6d140f..fcbf20c0f722bd 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -589,10 +589,4 @@ bool RuntimeState::is_nereids() const { return _query_ctx->is_nereids(); } -pipeline::PipelineXTask* RuntimeState::pipeline_x_task() const { - DCHECK(_pipeline_x_task && _pipeline_x_task->is_running()) - << (_pipeline_x_task ? _pipeline_x_task->debug_string() : "_pipeline_x_task is NULL"); - return _pipeline_x_task; -} - } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index eddcd98a3d4c8a..6fae242d53cd95 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -624,12 +624,6 @@ class RuntimeState { int task_id() const { return _task_id; } - pipeline::PipelineXTask* pipeline_x_task() const; - - void set_pipeline_x_task(pipeline::PipelineXTask* pipeline_x_task) { - _pipeline_x_task = pipeline_x_task; - } - private: Status create_error_log_file(); @@ -753,7 +747,6 @@ class RuntimeState { bool _load_zero_tolerance = false; std::vector> _pipeline_id_to_profile; - pipeline::PipelineXTask* _pipeline_x_task = nullptr; // prohibit copies RuntimeState(const RuntimeState&); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index b4d151f8655387..2913fad3d8d8d1 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -97,30 +97,16 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector vexprs; - Status rf_status = Status::OK(); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; if (pipeline_x) { - DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency) - << _state->pipeline_x_task()->debug_string(); - auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by( - _state->pipeline_x_task()); - - bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout(); - - bool ready = rf_dep == nullptr && !timeout; - if (!ready) { - runtime_filter->await(); - } - if (ready && !_runtime_filter_ctxs[i].apply_mark) { + runtime_filter->update_state(); + if (runtime_filter->is_ready() && !_runtime_filter_ctxs[i].apply_mark) { // Runtime filter has been applied in open phase. RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); _runtime_filter_ctxs[i].apply_mark = true; - } else if (rf_dep != nullptr) { - // Runtime filter is neither ready nor timeout, so we should continue to wait RF. - return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); - } else if (!ready && !_runtime_filter_ctxs[i].apply_mark) { - // Runtime filter is timeout and not applied. + } else if (!_runtime_filter_ctxs[i].apply_mark) { + // Runtime filter is timeout. _is_all_rf_applied = false; } } else { From 372072f7a1761fb7e9c3cf0803fe3e3523f0ec17 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 8 Apr 2024 11:09:24 +0800 Subject: [PATCH 7/8] update --- be/src/pipeline/pipeline_x/dependency.cpp | 1 - be/src/pipeline/pipeline_x/dependency.h | 4 ---- 2 files changed, 5 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index d795c67687ccd4..75af52080f0610 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -116,7 +116,6 @@ Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { } void RuntimeFilterTimer::call_timeout() { - _parent->set_timeout(); _parent->set_ready(); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 7c34c4902924a7..1af48748d6c298 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -280,12 +280,8 @@ class RuntimeFilterDependency final : public Dependency { Dependency* is_blocked_by(PipelineXTask* task) override; - void set_timeout() { _is_timeout = true; } - bool timeout() const { return _is_timeout.load(); } - private: const IRuntimeFilter* _runtime_filter = nullptr; - std::atomic_bool _is_timeout = false; }; struct AggSharedState : public BasicSharedState { From 7f65d65cdb1e625094deee258e479d1bd6a942d6 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 8 Apr 2024 11:12:44 +0800 Subject: [PATCH 8/8] update --- be/src/pipeline/pipeline_x/dependency.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 75af52080f0610..73f089f4c1b211 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -128,8 +128,7 @@ void RuntimeFilterTimerQueue::start() { std::unique_lock lk(cv_m); while (_que.empty() && !_stop) { - cv.wait_for(lk, std::chrono::seconds(interval), - [this] { return !_que.empty() || _stop; }); + cv.wait_for(lk, std::chrono::seconds(3), [this] { return !_que.empty() || _stop; }); } if (_stop) { break;