Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}

if (p._use_shared_hash_table) {
std::unique_lock(p._mutex);
std::unique_lock lock(p._mutex);
p._signaled = true;
for (auto& dep : _shared_state->sink_deps) {
dep->set_ready();
Expand Down
12 changes: 1 addition & 11 deletions be/src/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,13 @@ Status RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilte
}
if (_rf_state != State::APPLIED && _rf_state != State::TIMEOUT) {
_set_state(State::TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe set TIMEOUT/ after set READY

DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
_profile->add_info_string("ReachTimeoutLimit", "true");
}
return Status::OK();
}

void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS));
_wrapper = other->_wrapper;
_check_wrapper_state({RuntimeFilterWrapper::State::DISABLED,
RuntimeFilterWrapper::State::IGNORED,
RuntimeFilterWrapper::State::READY});
_check_state({State::NOT_READY, State::TIMEOUT});
_set_state(State::READY);
DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1);
DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment(MonotonicMillis() -
_registration_time);
_set_state(State::READY, other->_wrapper);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
timer->call_ready();
Expand Down
29 changes: 27 additions & 2 deletions be/src/runtime_filter/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class RuntimeFilterConsumer : public RuntimeFilter {
Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs);

std::string debug_string() const override {
return fmt::format("Consumer: ({}, state: {})", _debug_string(), to_string(_rf_state));
return fmt::format("Consumer: ({}, state: {}, reached_timeout: {}, timeout_limit: {}ms)",
_debug_string(), to_string(_rf_state),
_reached_timeout ? "true" : "false", std::to_string(_rf_wait_time_ms));
}

bool is_applied() { return _rf_state == State::APPLIED; }
Expand Down Expand Up @@ -120,7 +122,25 @@ class RuntimeFilterConsumer : public RuntimeFilter {
}
}

void _set_state(State rf_state) {
void _set_state(State rf_state, std::shared_ptr<RuntimeFilterWrapper> other = nullptr) {
std::unique_lock<std::mutex> l(_mtx);
if (rf_state == State::TIMEOUT) {
DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
_reached_timeout = true;
if (_rf_state != State::NOT_READY) {
// reach timeout but do not change State::ready to State::timeout
return;
}
} else if (rf_state == State::READY) {
DorisMetrics::instance()->runtime_filter_consumer_ready_num->increment(1);
DorisMetrics::instance()->runtime_filter_consumer_wait_ready_ms->increment(
MonotonicMillis() - _registration_time);
_wrapper = other;
_check_wrapper_state({RuntimeFilterWrapper::State::DISABLED,
RuntimeFilterWrapper::State::IGNORED,
RuntimeFilterWrapper::State::READY});
_check_state({State::NOT_READY, State::TIMEOUT});
}
_rf_state = rf_state;
_profile->add_info_string("Info", debug_string());
}
Expand All @@ -142,6 +162,11 @@ class RuntimeFilterConsumer : public RuntimeFilter {
const int64_t _registration_time;

std::atomic<State> _rf_state;
// only used to lock _set_state() to make _wrapper and _rf_state is protected
// signal and acquire_expr are called in different threads at the same time
std::mutex _mtx;

bool _reached_timeout = false;

friend class RuntimeFilterProducer;
};
Expand Down
18 changes: 10 additions & 8 deletions be/test/io/fs/s3_obj_storage_client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ class S3ObjStorageClientTest : public testing::Test {

S3ObjStorageClientTest::bucket = std::getenv("AWS_BUCKET");

S3ObjStorageClientTest::obj_storage_client =
S3ClientFactory::instance().create({.endpoint = endpoint,
.region = "dummy-region",
.ak = access_key,
.sk = secret_key,
.bucket = bucket,
.provider = io::ObjStorageType::AWS,
.use_virtual_addressing = false});
S3ObjStorageClientTest::obj_storage_client = S3ClientFactory::instance().create({
.endpoint = endpoint,
.region = "dummy-region",
.ak = access_key,
.sk = secret_key,
.token = "",
.bucket = bucket,
.provider = io::ObjStorageType::AWS,
.use_virtual_addressing = false,
});

ASSERT_TRUE(S3ObjStorageClientTest::obj_storage_client != nullptr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, InitLocalState) {

RuntimeProfile runtime_profile("test");
TDataSink t_sink;
LocalSinkStateInfo info {.parent_profile = &runtime_profile,
LocalSinkStateInfo info {.task_idx = 0,
.parent_profile = &runtime_profile,
.sender_id = 0,
.shared_state = shared_state.get(),
.shared_state_map = {},
.tsink = t_sink};
Expand Down Expand Up @@ -222,6 +224,7 @@ TEST_F(PartitionedHashJoinSinkOperatorTest, SinkEosAndSpill) {

LocalSinkStateInfo sink_info {.task_idx = 0,
.parent_profile = _helper.runtime_profile.get(),
.sender_id = 0,
.shared_state = shared_state.get(),
.shared_state_map = {},
.tsink = TDataSink()};
Expand Down
23 changes: 23 additions & 0 deletions be/test/runtime_filter/runtime_filter_consumer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,27 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile));
}

TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
for (int i = 0; i < 100; i++) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
&desc, 0, &consumer, &_profile));

std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);

std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
std::thread thread1(
[&]() { [[maybe_unused]] auto res = consumer->acquire_expr(push_exprs); });
std::thread thread2([&]() { consumer->signal(producer.get()); });
thread1.join();
thread2.join();

ASSERT_NE(consumer->_rf_state, RuntimeFilterConsumer::State::TIMEOUT);
}
}
} // namespace doris
Loading