diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp index cc43702ae9..99f7d7eaf0 100644 --- a/src/bthread/butex.cpp +++ b/src/bthread/butex.cpp @@ -100,6 +100,7 @@ struct ButexBthreadWaiter : public ButexWaiter { int expected_value; Butex* initial_butex; TaskControl* control; + const timespec* abstime; }; // pthread_task or main_task allocates this structure on stack and queue it @@ -534,6 +535,14 @@ static void wait_for_butex(void* arg) { !bw->task_meta->interrupted) { b->waiters.Append(bw); bw->container.store(b, butil::memory_order_relaxed); + if (bw->abstime != NULL) { + bw->sleep_id = get_global_timer_thread()->schedule( + erase_from_butex_and_wakeup, bw, *bw->abstime); + if (!bw->sleep_id) { // TimerThread stopped. + errno = ESTOP; + erase_from_butex_and_wakeup(bw); + } + } return; } } @@ -542,7 +551,7 @@ static void wait_for_butex(void* arg) { // TaskGroup::interrupt() no-op, there's no race between following code and // the two functions. The on-stack ButexBthreadWaiter is safe to use and // bw->waiter_state will not change again. - unsleep_if_necessary(bw, get_global_timer_thread()); + // unsleep_if_necessary(bw, get_global_timer_thread()); tls_task_group->ready_to_run(bw->tid); // FIXME: jump back to original thread is buggy. @@ -648,6 +657,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { bbw.expected_value = expected_value; bbw.initial_butex = b; bbw.control = g->control(); + bbw.abstime = abstime; if (abstime != NULL) { // Schedule timer before queueing. If the timer is triggered before @@ -658,12 +668,6 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { errno = ETIMEDOUT; return -1; } - bbw.sleep_id = get_global_timer_thread()->schedule( - erase_from_butex_and_wakeup, &bbw, *abstime); - if (!bbw.sleep_id) { // TimerThread stopped. - errno = ESTOP; - return -1; - } } #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS bvar::Adder& num_waiters = butex_waiter_count(); diff --git a/test/bthread_cond_bug_unittest.cpp b/test/bthread_cond_bug_unittest.cpp new file mode 100644 index 0000000000..91b2b0fc77 --- /dev/null +++ b/test/bthread_cond_bug_unittest.cpp @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include +#include + +#include "bthread/bthread.h" +#include "bthread/condition_variable.h" +#include "bthread/mutex.h" +#include "butil/logging.h" +#include "butil/macros.h" +#include "bvar/bvar.h" + +DEFINE_int64(wait_us, 5, "wait us"); +typedef std::unique_lock Lock; +typedef bthread::ConditionVariable Condition; +bthread::Mutex g_mutex; +Condition g_cond; +std::deque g_que; +const size_t g_capacity = 2000; +const int PRODUCER_NUM = 5; +struct ProducerStat { + std::atomic loop_count; + bvar::Adder wait_count; + bvar::Adder wait_timeout_count; + bvar::Adder wait_success_count; +}; +ProducerStat g_stat[PRODUCER_NUM]; + +void* print_func(void* arg) { + int last_loop[PRODUCER_NUM] = {0}; + for (int j = 0; j < 10; j++) { + usleep(1000000); + for (int i = 0; i < PRODUCER_NUM; i++) { + if (g_stat[i].loop_count.load() <= last_loop[i]) { + LOG(ERROR) << "producer thread:" << i << " stopped"; + return nullptr; + } + LOG(INFO) << "producer stat idx:" << i + << " wait:" << g_stat[i].wait_count + << " wait_timeout:" << g_stat[i].wait_timeout_count + << " wait_success:" << g_stat[i].wait_success_count; + g_stat[i].loop_count = g_stat[i].loop_count.load(); + } + } + return (void*)1; +} + +void* produce_func(void* arg) { + const int64_t wait_us = FLAGS_wait_us; + LOG(INFO) << "wait us:" << wait_us; + int64_t idx = (int64_t)(arg); + int32_t i = 0; + while (!bthread_stopped(bthread_self())) { + LOG(INFO) << "come to a new round " << idx << "round[" << i << "]"; + { + Lock lock(g_mutex); + while (g_que.size() >= g_capacity && !bthread_stopped(bthread_self())) { + g_stat[idx].wait_count << 1; + //LOG(INFO) << "wait begin " << idx; + int ret = g_cond.wait_for(lock, wait_us); + if (ret == ETIMEDOUT) { + g_stat[idx].wait_timeout_count << 1; + //LOG_EVERY_SECOND(INFO) << "wait timeout " << idx; + } else { + g_stat[idx].wait_success_count << 1; + //LOG_EVERY_SECOND(INFO) << "wait early " << idx; + } + } + g_que.push_back(++i); + LOG(INFO) << "push back " << idx << " data[" << i << "]"; + } + usleep(rand() % 20 + 5); + g_stat[idx].loop_count.fetch_add(1); + } + LOG(INFO) << "producer func return, idx:" << idx; + return nullptr; +} + +void* consume_func(void* arg) { + while (!bthread_stopped(bthread_self())) { + bool need_notify = false; + { + Lock lock(g_mutex); + need_notify = (g_que.size() == g_capacity); + if (!g_que.empty()) { + g_que.pop_front(); + LOG_EVERY_SECOND(INFO) << "pop a data"; + } else { + LOG_EVERY_SECOND(INFO) << "que is empty"; + } + } + usleep(rand() % 300 + 500); + if (need_notify) { + //g_cond.notify_all(); + //LOG(WARNING) << "notify"; + } + } + LOG(INFO) << "consumer func return"; + return nullptr; +} + +TEST(BthreadCondBugTest, test_bug) { + bthread_t tids[PRODUCER_NUM]; + for (int i = 0; i < PRODUCER_NUM; i++) { + bthread_start_background(&tids[i], NULL, produce_func, (void*)(int64_t)i); + } + bthread_t tid; + bthread_start_background(&tid, NULL, consume_func, NULL); + + int64_t ret = (int64_t)print_func(nullptr); + + bthread_stop(tid); + bthread_join(tid, nullptr); + for (int i = 0; i < PRODUCER_NUM; i++) { + bthread_stop(tids[i]); + bthread_join(tids[i], nullptr); + } + + ASSERT_EQ(ret, 1); +} \ No newline at end of file