From a9f73c571557b34e1d23272dbe933ce0c5f702ce Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Tue, 1 Apr 2025 18:17:13 +0800 Subject: [PATCH] add lock for RuntimeFilterConsumer::acquire_expr/signal to avoid multithread read and write wrapper add comment update update ut update update comment update fix update update --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +- .../runtime_filter_consumer.cpp | 12 +------- .../runtime_filter/runtime_filter_consumer.h | 29 +++++++++++++++++-- be/test/io/fs/s3_obj_storage_client_test.cpp | 18 +++++++----- ...rtitioned_hash_join_sink_operator_test.cpp | 5 +++- .../runtime_filter_consumer_test.cpp | 23 +++++++++++++++ 6 files changed, 66 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 1ce34fe05fc958..88eac4b145a4a2 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -220,7 +220,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } if (p._use_shared_hash_table) { - std::unique_lock(p._mutex); + std::unique_lock lock(p._mutex); p._signaled = true; for (auto& dep : _shared_state->sink_deps) { dep->set_ready(); diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp b/be/src/runtime_filter/runtime_filter_consumer.cpp index 5c37a55c830fb7..33daafca322b52 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.cpp +++ b/be/src/runtime_filter/runtime_filter_consumer.cpp @@ -65,23 +65,13 @@ Status RuntimeFilterConsumer::acquire_expr(std::vectorruntime_filter_consumer_timeout_num->increment(1); - _profile->add_info_string("ReachTimeoutLimit", "true"); } return Status::OK(); } void RuntimeFilterConsumer::signal(RuntimeFilter* other) { COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); - _wrapper = other->_wrapper; - _check_wrapper_state({RuntimeFilterWrapper::State::DISABLED, - RuntimeFilterWrapper::State::IGNORED, - RuntimeFilterWrapper::State::READY}); - _check_state({State::NOT_READY, State::TIMEOUT}); - _set_state(State::READY); - DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1); - DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment(MonotonicMillis() - - _registration_time); + _set_state(State::READY, other->_wrapper); if (!_filter_timer.empty()) { for (auto& timer : _filter_timer) { timer->call_ready(); diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index 3ad4dc6dd44551..cc6581fa9fa776 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -58,7 +58,9 @@ class RuntimeFilterConsumer : public RuntimeFilter { Status acquire_expr(std::vector& push_exprs); std::string debug_string() const override { - return fmt::format("Consumer: ({}, state: {})", _debug_string(), to_string(_rf_state)); + return fmt::format("Consumer: ({}, state: {}, reached_timeout: {}, timeout_limit: {}ms)", + _debug_string(), to_string(_rf_state), + _reached_timeout ? "true" : "false", std::to_string(_rf_wait_time_ms)); } bool is_applied() { return _rf_state == State::APPLIED; } @@ -120,7 +122,25 @@ class RuntimeFilterConsumer : public RuntimeFilter { } } - void _set_state(State rf_state) { + void _set_state(State rf_state, std::shared_ptr other = nullptr) { + std::unique_lock l(_mtx); + if (rf_state == State::TIMEOUT) { + DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1); + _reached_timeout = true; + if (_rf_state != State::NOT_READY) { + // reach timeout but do not change State::ready to State::timeout + return; + } + } else if (rf_state == State::READY) { + DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1); + DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment( + MonotonicMillis() - _registration_time); + _wrapper = other; + _check_wrapper_state({RuntimeFilterWrapper::State::DISABLED, + RuntimeFilterWrapper::State::IGNORED, + RuntimeFilterWrapper::State::READY}); + _check_state({State::NOT_READY, State::TIMEOUT}); + } _rf_state = rf_state; _profile->add_info_string("Info", debug_string()); } @@ -142,6 +162,11 @@ class RuntimeFilterConsumer : public RuntimeFilter { const int64_t _registration_time; std::atomic _rf_state; + // only used to lock _set_state() to make _wrapper and _rf_state is protected + // signal and acquire_expr are called in different threads at the same time + std::mutex _mtx; + + bool _reached_timeout = false; friend class RuntimeFilterProducer; }; diff --git a/be/test/io/fs/s3_obj_storage_client_test.cpp b/be/test/io/fs/s3_obj_storage_client_test.cpp index 22bbcc94819271..3e68adbeaa1bea 100644 --- a/be/test/io/fs/s3_obj_storage_client_test.cpp +++ b/be/test/io/fs/s3_obj_storage_client_test.cpp @@ -39,14 +39,16 @@ class S3ObjStorageClientTest : public testing::Test { S3ObjStorageClientTest::bucket = std::getenv("AWS_BUCKET"); - S3ObjStorageClientTest::obj_storage_client = - S3ClientFactory::instance().create({.endpoint = endpoint, - .region = "dummy-region", - .ak = access_key, - .sk = secret_key, - .bucket = bucket, - .provider = io::ObjStorageType::AWS, - .use_virtual_addressing = false}); + S3ObjStorageClientTest::obj_storage_client = S3ClientFactory::instance().create({ + .endpoint = endpoint, + .region = "dummy-region", + .ak = access_key, + .sk = secret_key, + .token = "", + .bucket = bucket, + .provider = io::ObjStorageType::AWS, + .use_virtual_addressing = false, + }); ASSERT_TRUE(S3ObjStorageClientTest::obj_storage_client != nullptr); } diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp index 010cd6ea223def..2bfdfdc618118d 100644 --- a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -119,7 +119,9 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitLocalState) { RuntimeProfile runtime_profile("test"); TDataSink t_sink; - LocalSinkStateInfo info {.parent_profile = &runtime_profile, + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = &runtime_profile, + .sender_id = 0, .shared_state = shared_state.get(), .shared_state_map = {}, .tsink = t_sink}; @@ -222,6 +224,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, SinkEosAndSpill) { LocalSinkStateInfo sink_info {.task_idx = 0, .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, .shared_state = shared_state.get(), .shared_state_map = {}, .tsink = TDataSink()}; diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 44cb6105579fac..de6882b5acaaa2 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -237,4 +237,27 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) { RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); } +TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) { + for (int i = 0; i < 100; i++) { + std::shared_ptr consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()), + &desc, 0, &consumer, &_profile)); + + std::shared_ptr producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); + + std::vector push_exprs; + std::thread thread1( + [&]() { [[maybe_unused]] auto res = consumer->acquire_expr(push_exprs); }); + std::thread thread2([&]() { consumer->signal(producer.get()); }); + thread1.join(); + thread2.join(); + + ASSERT_NE(consumer->_rf_state, RuntimeFilterConsumer::State::TIMEOUT); + } +} } // namespace doris