diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f680db5adeee01..1c5c3f7d4a2907 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1095,7 +1095,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listget_push_exprs(probe_ctxs, push_exprs, _probe_expr)); } - _profile->add_info_string("Info", _format_status()); + _profile->add_info_string("Info", formatted_state()); // The runtime filter is pushed down, adding filtering information. auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT); auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT); @@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() { return true; } +void IRuntimeFilter::update_state() { + DCHECK(is_consumer()); + auto execution_timeout = _state->execution_timeout * 1000; + auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms; + // bitmap filter is precise filter and only filter once, so it must be applied. + int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER + ? execution_timeout + : runtime_filter_wait_time_ms; + auto expected = _rf_state_atomic.load(std::memory_order_acquire); + DCHECK(_enable_pipeline_exec); + // In pipelineX, runtime filters will be ready or timeout before open phase. + if (expected == RuntimeFilterState::NOT_READY) { + DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms); + _rf_state_atomic = RuntimeFilterState::TIME_OUT; + } +} + // NOTE: Wait infinitely will not make scan task wait really forever. // Because BlockTaskSchedule will make it run when query is timedout. bool IRuntimeFilter::wait_infinitely() const { @@ -1236,7 +1253,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) { _wrapper->_ignored_msg = msg; } -std::string IRuntimeFilter::_format_status() const { +std::string IRuntimeFilter::formatted_state() const { return fmt::format( "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, " "HasLocalTarget = {}]", @@ -1411,7 +1428,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { } else { _profile_init = true; parent_profile->add_child(_profile.get(), true, nullptr); - _profile->add_info_string("Info", _format_status()); + _profile->add_info_string("Info", formatted_state()); } } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 620d61ae5648e4..ff825523ae198f 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -263,6 +263,7 @@ class IRuntimeFilter { // This function will wait at most config::runtime_filter_shuffle_wait_time_ms // if return true , filter is ready to use bool await(); + void update_state(); // this function will be called if a runtime filter sent by rpc // it will notify all wait threads void signal(); @@ -355,6 +356,7 @@ class IRuntimeFilter { int64_t registration_time() const { return registration_time_; } void set_filter_timer(std::shared_ptr); + std::string formatted_state() const; protected: // serialize _wrapper to protobuf @@ -373,8 +375,6 @@ class IRuntimeFilter { void _set_push_down(bool push_down) { _is_push_down = push_down; } - std::string _format_status() const; - std::string _get_explain_state_string() const { if (_enable_pipeline_exec) { return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 049905c5cc4123..c40af83bd58ad6 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -77,7 +77,7 @@ Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) { if (_t_data_stream_sink.__isset.conjuncts) { RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); } - return _acquire_runtime_filter(); + return _acquire_runtime_filter(false); } bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() { @@ -129,10 +129,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime vectorized::RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), static_cast(parent)->_row_desc(), _conjuncts) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -}; +} Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); @@ -145,12 +142,30 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } + _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); - init_runtime_filter_dependency(_filter_dependency.get()); + init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), + p.get_name() + "_FILTER_DEPENDENCY"); return Status::OK(); } +Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } + + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(exec_time_counter()); + int64_t rf_time = 0; + for (auto& dep : _filter_dependencies) { + rf_time += dep->watcher_elapse_time(); + } + COUNTER_SET(_wait_for_rf_timer, rf_time); + + return Base::close(state); +} + Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { //auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 3af8c5507bdc8f..fd1f6f2c033667 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -104,16 +104,29 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState filter_dependencies() override { + if (_filter_dependencies.empty()) { + return {}; + } + std::vector res; + res.resize(_filter_dependencies.size()); + for (size_t i = 0; i < _filter_dependencies.size(); i++) { + res[i] = _filter_dependencies[i].get(); + } + return res; + } private: vectorized::VExprContextSPtrs _output_expr_contexts; - std::shared_ptr _filter_dependency; + std::vector> _filter_dependencies; + + RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; class MultiCastDataStreamerSourceOperatorX final diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 2f21b1626a5e88..4218ac1308dcbb 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const { return; \ } -template -ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalStateBase(state, parent) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -} - template bool ScanLocalState::ready_to_read() { return !_scanner_ctx->empty_in_queue(0); @@ -133,7 +125,8 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) } // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); - init_runtime_filter_dependency(_filter_dependency.get()); + init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), + p.get_name() + "_FILTER_DEPENDENCY"); // 1: running at not pipeline mode will init profile. // 2: the scan node should create scanner at pipeline mode will init profile. @@ -156,7 +149,7 @@ Status ScanLocalState::open(RuntimeState* state) { if (_opened) { return Status::OK(); } - RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_ERROR(_acquire_runtime_filter(true)); RETURN_IF_ERROR(_process_conjuncts()); auto status = _eos ? Status::OK() : _prepare_scanners(); @@ -1412,7 +1405,11 @@ Status ScanLocalState::close(RuntimeState* state) { return Status::OK(); } COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time()); - COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time()); + int64_t rf_time = 0; + for (auto& dep : _filter_dependencies) { + rf_time += dep->watcher_elapse_time(); + } + COUNTER_UPDATE(exec_time_counter(), rf_time); SCOPED_TIMER(_close_timer); SCOPED_TIMER(exec_time_counter()); @@ -1421,7 +1418,7 @@ Status ScanLocalState::close(RuntimeState* state) { } std::list> {}.swap(_scanners); COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_rf_timer, rf_time); return PipelineXLocalState<>::close(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index a22a00da529978..e941f8ce969295 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -134,7 +134,8 @@ class ScanOperatorX; template class ScanLocalState : public ScanLocalStateBase { ENABLE_FACTORY_CREATOR(ScanLocalState); - ScanLocalState(RuntimeState* state, OperatorXBase* parent); + ScanLocalState(RuntimeState* state, OperatorXBase* parent) + : ScanLocalStateBase(state, parent) {} ~ScanLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; @@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; - RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + std::vector filter_dependencies() override { + if (_filter_dependencies.empty()) { + return {}; + } + std::vector res; + res.resize(_filter_dependencies.size()); + for (size_t i = 0; i < _filter_dependencies.size(); i++) { + res[i] = _filter_dependencies[i].get(); + } + return res; + } std::vector dependencies() const override { return {_scan_dependency.get()}; } @@ -364,7 +375,7 @@ class ScanLocalState : public ScanLocalStateBase { std::mutex _block_lock; - std::shared_ptr _filter_dependency; + std::vector> _filter_dependencies; // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr std::list> _scanners; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 016410b54bb589..73f089f4c1b211 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) { return ready ? nullptr : this; } -Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { - if (!_blocked_by_rf) { - return nullptr; - } - std::unique_lock lc(_task_lock); - if (*_blocked_by_rf && !_is_cancelled()) { - if (LIKELY(task)) { - _add_block_task(task); - } - return this; - } - return nullptr; -} - std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, @@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) { std::string RuntimeFilterDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}", - std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), - _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false); + fmt::format_to(debug_string_buffer, "{}, runtime filter: {}", + Dependency::debug_string(indentation_level), _runtime_filter->formatted_state()); return fmt::to_string(debug_string_buffer); } -bool RuntimeFilterTimer::has_ready() { - std::unique_lock lc(_lock); - return _is_ready; +Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { + std::unique_lock lc(_task_lock); + auto ready = _ready.load() || _is_cancelled(); + if (!ready && task) { + _add_block_task(task); + task->_blocked_dep = this; + } + return ready ? nullptr : this; } void RuntimeFilterTimer::call_timeout() { - std::unique_lock lc(_lock); - if (_call_ready) { - return; - } - _call_timeout = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } + _parent->set_ready(); } void RuntimeFilterTimer::call_ready() { - std::unique_lock lc(_lock); - if (_call_timeout) { - return; - } - _call_ready = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } - _is_ready = true; -} - -void RuntimeFilterTimer::call_has_ready() { - std::unique_lock lc(_lock); - DCHECK(!_call_timeout); - if (!_call_ready) { - _parent->sub_filters(_filter_id); - } + _parent->set_ready(); } -void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { - const auto filter_id = runtime_filter->filter_id(); - ; - _filters++; - _filter_ready_map[filter_id] = false; - int64_t registration_time = runtime_filter->registration_time(); - int32 wait_time_ms = runtime_filter->wait_time_ms(); - auto filter_timer = std::make_shared( - filter_id, registration_time, wait_time_ms, - std::dynamic_pointer_cast(shared_from_this())); - runtime_filter->set_filter_timer(filter_timer); - ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); -} +void RuntimeFilterTimerQueue::start() { + while (!_stop) { + std::unique_lock lk(cv_m); -void RuntimeFilterDependency::sub_filters(int id) { - std::vector local_block_task {}; - { - std::lock_guard lk(_task_lock); - if (!_filter_ready_map[id]) { - _filter_ready_map[id] = true; - _filters--; + while (_que.empty() && !_stop) { + cv.wait_for(lk, std::chrono::seconds(3), [this] { return !_que.empty() || _stop; }); + } + if (_stop) { + break; } - if (_filters == 0) { - _watcher.stop(); - { - *_blocked_by_rf = false; - local_block_task.swap(_blocked_task); + { + std::unique_lock lc(_que_lock); + std::list> new_que; + for (auto& it : _que) { + if (it.use_count() == 1) { + // `use_count == 1` means this runtime filter has been released + } else if (it->_parent->is_blocked_by(nullptr)) { + // This means runtime filter is not ready, so we call timeout or continue to poll this timer. + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } + } } + new_que.swap(_que); } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } - for (auto* task : local_block_task) { - task->wake_up(); - } + _shutdown = true; } void LocalExchangeSharedState::sub_running_sink_operators() { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index fe95c1c4470058..c7f37881cd239c 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -205,79 +205,41 @@ struct FinishDependency final : public Dependency { }; class RuntimeFilterDependency; +struct RuntimeFilterTimerQueue; class RuntimeFilterTimer { public: - RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t wait_time_ms, + RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, std::shared_ptr parent) - : _filter_id(filter_id), - _parent(std::move(parent)), + : _parent(std::move(parent)), _registration_time(registration_time), _wait_time_ms(wait_time_ms) {} + // Called by runtime filter producer. void call_ready(); + // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout. void call_timeout(); - void call_has_ready(); - - // When the use count is equal to 1, only the timer queue still holds ownership, - // so there is no need to take any action. - void call_has_release() {}; - - bool has_ready(); - int64_t registration_time() const { return _registration_time; } int32_t wait_time_ms() const { return _wait_time_ms; } private: - int _filter_id = -1; - bool _call_ready {}; - bool _call_timeout {}; - std::shared_ptr _parent; + friend struct RuntimeFilterTimerQueue; + std::shared_ptr _parent = nullptr; std::mutex _lock; const int64_t _registration_time; const int32_t _wait_time_ms; - bool _is_ready = false; }; struct RuntimeFilterTimerQueue { constexpr static int64_t interval = 10; void run() { _thread.detach(); } - void start() { - while (!_stop) { - std::unique_lock lk(cv_m); - - cv.wait(lk, [this] { return !_que.empty() || _stop; }); - if (_stop) { - break; - } - { - std::unique_lock lc(_que_lock); - std::list> new_que; - for (auto& it : _que) { - if (it.use_count() == 1) { - it->call_has_release(); - } else if (it->has_ready()) { - it->call_has_ready(); - } else { - int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); - if (ms_since_registration > it->wait_time_ms()) { - it->call_timeout(); - } else { - new_que.push_back(std::move(it)); - } - } - } - new_que.swap(_que); - } - std::this_thread::sleep_for(std::chrono::milliseconds(interval)); - } - _shutdown = true; - } + void start(); void stop() { _stop = true; cv.notify_all(); + wait_for_shutdown(); } void wait_for_shutdown() const { @@ -286,7 +248,7 @@ struct RuntimeFilterTimerQueue { } } - ~RuntimeFilterTimerQueue() { wait_for_shutdown(); } + ~RuntimeFilterTimerQueue() = default; RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } void push_filter_timer(std::shared_ptr filter) { push(filter); } @@ -307,21 +269,15 @@ struct RuntimeFilterTimerQueue { class RuntimeFilterDependency final : public Dependency { public: - RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx) - : Dependency(id, node_id, name, query_ctx) {} - Dependency* is_blocked_by(PipelineXTask* task) override; - void add_filters(IRuntimeFilter* runtime_filter); - void sub_filters(int id); - void set_blocked_by_rf(std::shared_ptr blocked_by_rf) { - _blocked_by_rf = blocked_by_rf; - } - + RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx, + IRuntimeFilter* runtime_filter) + : Dependency(id, node_id, name, query_ctx), _runtime_filter(runtime_filter) {} std::string debug_string(int indentation_level = 0) override; -protected: - std::atomic_int _filters; - phmap::flat_hash_map _filter_ready_map; - std::shared_ptr _blocked_by_rf; + Dependency* is_blocked_by(PipelineXTask* task) override; + +private: + const IRuntimeFilter* _runtime_filter = nullptr; }; struct AggSharedState : public BasicSharedState { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index c375efb924dcbc..da20b9885a8771 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -102,7 +102,7 @@ class PipelineXLocalStateBase { // override in Scan virtual Dependency* finishdependency() { return nullptr; } // override in Scan MultiCastSink - virtual RuntimeFilterDependency* filterdependency() { return nullptr; } + virtual std::vector filter_dependencies() { return {}; } std::shared_ptr get_query_statistics_ptr() { return _query_statistics; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index da5da2f0477277..9d5338e7f5e331 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -139,7 +139,11 @@ Status PipelineXTask::_extract_dependencies() { _finish_dependencies.push_back(fin_dep); } } - { _filter_dependency = _state->get_local_state(_source->operator_id())->filterdependency(); } + { + const auto& deps = _state->get_local_state(_source->operator_id())->filter_dependencies(); + std::copy(deps.begin(), deps.end(), + std::inserter(_filter_dependencies, _filter_dependencies.end())); + } return Status::OK(); } @@ -189,22 +193,7 @@ Status PipelineXTask::_open() { _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { auto* local_state = _state->get_local_state(o->operator_id()); - for (size_t i = 0; i < 2; i++) { - auto st = local_state->open(_state); - if (st.is()) { - DCHECK(_filter_dependency); - _blocked_dep = _filter_dependency->is_blocked_by(this); - if (_blocked_dep) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); - RETURN_IF_ERROR(st); - } else if (i == 1) { - return Status::InternalError("Unknown RF error, task was blocked by RF twice"); - } - } else { - RETURN_IF_ERROR(st); - break; - } - } + RETURN_IF_ERROR(local_state->open(_state)); } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state)); _opened = true; @@ -234,15 +223,15 @@ Status PipelineXTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } + if (_runtime_filter_blocked_dependency() != nullptr) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); + return Status::OK(); + } // The status must be runnable if (!_opened) { { SCOPED_RAW_TIMER(&time_spent); - auto st = _open(); - if (st.is()) { - return Status::OK(); - } - RETURN_IF_ERROR(st); + RETURN_IF_ERROR(_open()); } if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); @@ -396,7 +385,7 @@ std::string PipelineXTask::debug_string() { if (_finished) { return fmt::to_string(debug_string_buffer); } - fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n"); + size_t i = 0; for (; i < _read_dependencies.size(); i++) { fmt::format_to(debug_string_buffer, "{}. {}\n", i, @@ -409,10 +398,10 @@ std::string PipelineXTask::debug_string() { _write_dependencies[j]->debug_string(i + 1)); } - if (_filter_dependency) { - fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); - i++; + fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency Information: \n"); + for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _filter_dependencies[j]->debug_string(i + 1)); } fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index c754af645ef7c5..a89df75fc9b271 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -155,6 +155,7 @@ class PipelineXTask : public PipelineTask { } private: + friend class RuntimeFilterDependency; Dependency* _write_blocked_dependency() { for (auto* op_dep : _write_dependencies) { _blocked_dep = op_dep->is_blocked_by(this); @@ -188,6 +189,17 @@ class PipelineXTask : public PipelineTask { return nullptr; } + Dependency* _runtime_filter_blocked_dependency() { + for (auto* op_dep : _filter_dependencies) { + _blocked_dep = op_dep->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return _blocked_dep; + } + } + return nullptr; + } + Status _extract_dependencies(); void set_close_pipeline_time() override {} void _init_profile() override; @@ -202,7 +214,7 @@ class PipelineXTask : public PipelineTask { std::vector _read_dependencies; std::vector _write_dependencies; std::vector _finish_dependencies; - RuntimeFilterDependency* _filter_dependency; + std::vector _filter_dependencies; // All shared states of this pipeline task. std::map> _op_shared_states; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 480521f58d31e4..df1b166c5fa079 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -34,6 +34,7 @@ #include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline_x/operator.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/exec_env.h" #include "runtime/load_path_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" @@ -543,4 +544,5 @@ Status RuntimeState::register_consumer_runtime_filter(const doris::TRuntimeFilte bool RuntimeState::is_nereids() const { return _query_ctx->is_nereids(); } + } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 79b82b94a11184..e6ffce6c81ddec 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -50,6 +50,7 @@ namespace pipeline { class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; class PipelineXFragmentContext; +class PipelineXTask; } // namespace pipeline class DescriptorTbl; diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 097df8016155c1..2913fad3d8d8d1 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -17,6 +17,8 @@ #include "vec/exec/runtime_filter_consumer.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" + namespace doris::vectorized { RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, @@ -75,36 +77,57 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { } void RuntimeFilterConsumer::init_runtime_filter_dependency( - doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) { - _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf); + std::vector>& + runtime_filter_dependencies, + const int id, const int node_id, const std::string& name) { + runtime_filter_dependencies.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - _runtime_filter_dependency->add_filters(runtime_filter); + runtime_filter_dependencies[i] = std::make_shared( + id, node_id, name, _state->get_query_ctx(), runtime_filter); + _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); + auto filter_timer = std::make_shared( + runtime_filter->registration_time(), runtime_filter->wait_time_ms(), + runtime_filter_dependencies[i]); + runtime_filter->set_filter_timer(filter_timer); + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } } -Status RuntimeFilterConsumer::_acquire_runtime_filter() { +Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - bool ready = runtime_filter->is_ready(); - if (!ready) { - ready = runtime_filter->await(); - } - if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); - _runtime_filter_ctxs[i].apply_mark = true; - } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && - !_runtime_filter_ctxs[i].apply_mark) { - *_blocked_by_rf = true; - } else if (!_runtime_filter_ctxs[i].apply_mark) { - DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); - _is_all_rf_applied = false; + if (pipeline_x) { + runtime_filter->update_state(); + if (runtime_filter->is_ready() && !_runtime_filter_ctxs[i].apply_mark) { + // Runtime filter has been applied in open phase. + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); + _runtime_filter_ctxs[i].apply_mark = true; + } else if (!_runtime_filter_ctxs[i].apply_mark) { + // Runtime filter is timeout. + _is_all_rf_applied = false; + } + } else { + bool ready = runtime_filter->is_ready(); + if (!ready) { + ready = runtime_filter->await(); + } + if (ready && !_runtime_filter_ctxs[i].apply_mark) { + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); + _runtime_filter_ctxs[i].apply_mark = true; + } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && + !_runtime_filter_ctxs[i].apply_mark) { + *_blocked_by_rf = true; + } else if (!_runtime_filter_ctxs[i].apply_mark) { + DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); + _is_all_rf_applied = false; + } } } RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); - if (*_blocked_by_rf) { + if (!pipeline_x && *_blocked_by_rf) { return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 86609624be6f40..61fdf13cd8bb11 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -38,13 +38,16 @@ class RuntimeFilterConsumer { bool runtime_filters_are_ready_or_timeout(); - void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*); + void init_runtime_filter_dependency( + std::vector>& + runtime_filter_dependencies, + const int id, const int node_id, const std::string& name); protected: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(bool need_local_merge); // Get all arrived runtime filters at Open phase. - Status _acquire_runtime_filter(); + Status _acquire_runtime_filter(bool pipeline_x); // Append late-arrival runtime filters to the vconjunct_ctx. Status _append_rf_into_conjuncts(const std::vector& vexprs); @@ -58,6 +61,7 @@ class RuntimeFilterConsumer { // set to true if this runtime filter is already applied to vconjunct_ctx_ptr bool apply_mark = false; IRuntimeFilter* runtime_filter = nullptr; + pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr; }; std::vector _runtime_filter_ctxs; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 5f8e6d8aa4c897..282d6f1182eb83 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -187,7 +187,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) { } _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); RETURN_IF_ERROR(ExecNode::alloc_resource(state)); - RETURN_IF_ERROR(_acquire_runtime_filter()); + RETURN_IF_ERROR(_acquire_runtime_filter(false)); RETURN_IF_ERROR(_process_conjuncts()); if (_is_pipeline_scan) {