Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr));
}
_profile->add_info_string("Info", _format_status());
_profile->add_info_string("Info", formatted_state());
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT);
Expand Down Expand Up @@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
return true;
}

void IRuntimeFilter::update_state() {
DCHECK(is_consumer());
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
DCHECK(_enable_pipeline_exec);
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}

// NOTE: Wait infinitely will not make scan task wait really forever.
// Because BlockTaskSchedule will make it run when query is timedout.
bool IRuntimeFilter::wait_infinitely() const {
Expand Down Expand Up @@ -1236,7 +1253,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
_wrapper->_ignored_msg = msg;
}

std::string IRuntimeFilter::_format_status() const {
std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, HasRemoteTarget = {}, "
"HasLocalTarget = {}]",
Expand Down Expand Up @@ -1411,7 +1428,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
} else {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
_profile->add_info_string("Info", formatted_state());
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class IRuntimeFilter {
// This function will wait at most config::runtime_filter_shuffle_wait_time_ms
// if return true , filter is ready to use
bool await();
void update_state();
// this function will be called if a runtime filter sent by rpc
// it will notify all wait threads
void signal();
Expand Down Expand Up @@ -355,6 +356,7 @@ class IRuntimeFilter {
int64_t registration_time() const { return registration_time_; }

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
std::string formatted_state() const;

protected:
// serialize _wrapper to protobuf
Expand All @@ -373,8 +375,6 @@ class IRuntimeFilter {

void _set_push_down(bool push_down) { _is_push_down = push_down; }

std::string _format_status() const;

std::string _get_explain_state_string() const {
if (_enable_pipeline_exec) {
return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY
Expand Down
27 changes: 21 additions & 6 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
}
return _acquire_runtime_filter();
return _acquire_runtime_filter(false);
}

bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
Expand Down Expand Up @@ -129,10 +129,7 @@ MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};
}

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand All @@ -145,12 +142,30 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
}
_wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependency.get());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(),
p.get_name() + "_FILTER_DEPENDENCY");
return Status::OK();
}

Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}

SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
int64_t rf_time = 0;
for (auto& dep : _filter_dependencies) {
rf_time += dep->watcher_elapse_time();
}
COUNTER_SET(_wait_for_rf_timer, rf_time);

return Base::close(state);
}

Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block, bool* eos) {
//auto& local_state = get_local_state(state);
Expand Down
19 changes: 16 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,29 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<Mul

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_acquire_runtime_filter(true));
return Status::OK();
}
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }
std::vector<Dependency*> filter_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
return res;
}

private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;

RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};

class MultiCastDataStreamerSourceOperatorX final
Expand Down
21 changes: 9 additions & 12 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const {
return; \
}

template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}

template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
return !_scanner_ctx->empty_in_queue(0);
Expand Down Expand Up @@ -133,7 +125,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
init_runtime_filter_dependency(_filter_dependency.get());
init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(),
p.get_name() + "_FILTER_DEPENDENCY");

// 1: running at not pipeline mode will init profile.
// 2: the scan node should create scanner at pipeline mode will init profile.
Expand All @@ -156,7 +149,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_acquire_runtime_filter(true));
RETURN_IF_ERROR(_process_conjuncts());

auto status = _eos ? Status::OK() : _prepare_scanners();
Expand Down Expand Up @@ -1412,7 +1405,11 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
return Status::OK();
}
COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time());
COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time());
int64_t rf_time = 0;
for (auto& dep : _filter_dependencies) {
rf_time += dep->watcher_elapse_time();
}
COUNTER_UPDATE(exec_time_counter(), rf_time);
SCOPED_TIMER(_close_timer);

SCOPED_TIMER(exec_time_counter());
Expand All @@ -1421,7 +1418,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
}
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, rf_time);

return PipelineXLocalState<>::close(state);
}
Expand Down
17 changes: 14 additions & 3 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ class ScanOperatorX;
template <typename Derived>
class ScanLocalState : public ScanLocalStateBase {
ENABLE_FACTORY_CREATOR(ScanLocalState);
ScanLocalState(RuntimeState* state, OperatorXBase* parent);
ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& info) override;
Expand Down Expand Up @@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase {

int64_t get_push_down_count() override;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
std::vector<Dependency*> filter_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
return res;
}

std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }

Expand Down Expand Up @@ -364,7 +375,7 @@ class ScanLocalState : public ScanLocalStateBase {

std::mutex _block_lock;

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;

// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
Expand Down
112 changes: 38 additions & 74 deletions be/src/pipeline/pipeline_x/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) {
return ready ? nullptr : this;
}

Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
if (!_blocked_by_rf) {
return nullptr;
}
std::unique_lock<std::mutex> lc(_task_lock);
if (*_blocked_by_rf && !_is_cancelled()) {
if (LIKELY(task)) {
_add_block_task(task);
}
return this;
}
return nullptr;
}

std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
Expand All @@ -114,82 +100,60 @@ std::string Dependency::debug_string(int indentation_level) {

std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}",
std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(),
_ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false);
fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
Dependency::debug_string(indentation_level), _runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}

bool RuntimeFilterTimer::has_ready() {
std::unique_lock<std::mutex> lc(_lock);
return _is_ready;
Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
std::unique_lock<std::mutex> lc(_task_lock);
auto ready = _ready.load() || _is_cancelled();
if (!ready && task) {
_add_block_task(task);
task->_blocked_dep = this;
}
return ready ? nullptr : this;
}

void RuntimeFilterTimer::call_timeout() {
std::unique_lock<std::mutex> lc(_lock);
if (_call_ready) {
return;
}
_call_timeout = true;
if (_parent) {
_parent->sub_filters(_filter_id);
}
_parent->set_ready();
}

void RuntimeFilterTimer::call_ready() {
std::unique_lock<std::mutex> lc(_lock);
if (_call_timeout) {
return;
}
_call_ready = true;
if (_parent) {
_parent->sub_filters(_filter_id);
}
_is_ready = true;
}

void RuntimeFilterTimer::call_has_ready() {
std::unique_lock<std::mutex> lc(_lock);
DCHECK(!_call_timeout);
if (!_call_ready) {
_parent->sub_filters(_filter_id);
}
_parent->set_ready();
}

void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
const auto filter_id = runtime_filter->filter_id();
;
_filters++;
_filter_ready_map[filter_id] = false;
int64_t registration_time = runtime_filter->registration_time();
int32 wait_time_ms = runtime_filter->wait_time_ms();
auto filter_timer = std::make_shared<RuntimeFilterTimer>(
filter_id, registration_time, wait_time_ms,
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
runtime_filter->set_filter_timer(filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
void RuntimeFilterTimerQueue::start() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'start' can be made static [readability-convert-member-functions-to-static]

be/src/pipeline/pipeline_x/dependency.h:236:

-     void start();
+     static void start();

while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);

void RuntimeFilterDependency::sub_filters(int id) {
std::vector<PipelineXTask*> local_block_task {};
{
std::lock_guard<std::mutex> lk(_task_lock);
if (!_filter_ready_map[id]) {
_filter_ready_map[id] = true;
_filters--;
while (_que.empty() && !_stop) {
cv.wait_for(lk, std::chrono::seconds(3), [this] { return !_que.empty() || _stop; });
}
if (_stop) {
break;
}
if (_filters == 0) {
_watcher.stop();
{
*_blocked_by_rf = false;
local_block_task.swap(_blocked_task);
{
std::unique_lock<std::mutex> lc(_que_lock);
std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
it->call_timeout();
} else {
new_que.push_back(std::move(it));
}
}
}
new_que.swap(_que);
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
for (auto* task : local_block_task) {
task->wake_up();
}
_shutdown = true;
}

void LocalExchangeSharedState::sub_running_sink_operators() {
Expand Down
Loading