From a92e15214bcecb3a9b324e5d616afb964bb69031 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 27 Mar 2025 16:25:11 +0800 Subject: [PATCH 1/6] GH-45953: Trying to use lock to fix bug in Generator --- cpp/src/arrow/util/async_generator.h | 49 +++++++++++++++++++--------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index a815b092800..1312617ced5 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -748,21 +749,31 @@ class ReadaheadGenerator { auto state = state_; return fut.Then( [state](const T& result) -> Future { - state->MarkFinishedIfDone(result); - if (state->finished.load()) { - if (state->num_running.fetch_sub(1) == 1) { - state->final_future.MarkFinished(); + bool mark_finished = false; + { + std::unique_lock lock(state->mu); + state->MarkFinishedIfDone(result); + --state->num_running; + if (state->finished) { + mark_finished = state->num_running == 0; } - } else { - state->num_running.fetch_sub(1); + } + if (mark_finished) { + state->final_future.MarkFinished(); } return result; }, [state](const Status& err) -> Future { // If there is an error we need to make sure all running // tasks finish before we return the error. - state->finished.store(true); - if (state->num_running.fetch_sub(1) == 1) { + bool mark_finished = false; + { + std::lock_guard lock(state->mu); + state->finished = true; + --state->num_running; + mark_finished = state->num_running == 0; + } + if (mark_finished) { state->final_future.MarkFinished(); } return state->final_future.Then([err]() -> Result { return err; }); @@ -772,7 +783,10 @@ class ReadaheadGenerator { Future operator()() { if (state_->readahead_queue.empty()) { // This is the first request, let's pump the underlying queue - state_->num_running.store(state_->max_readahead); + { + std::lock_guard lock(state_->mu); + state_->num_running = state_->max_readahead; + } for (int i = 0; i < state_->max_readahead; i++) { auto next = state_->source_generator(); auto next_after_check = AddMarkFinishedContinuation(std::move(next)); @@ -780,12 +794,15 @@ class ReadaheadGenerator { } } // Pop one and add one - auto result = state_->readahead_queue.front(); + auto result = std::move(state_->readahead_queue.front()); state_->readahead_queue.pop(); - if (state_->finished.load()) { + std::unique_lock lock(state_->mu); + if (state_->finished) { + lock.unlock(); state_->readahead_queue.push(AsyncGeneratorEnd()); } else { - state_->num_running.fetch_add(1); + ++state_->num_running; + lock.unlock(); auto back_of_queue = state_->source_generator(); auto back_of_queue_after_check = AddMarkFinishedContinuation(std::move(back_of_queue)); @@ -800,16 +817,18 @@ class ReadaheadGenerator { : source_generator(std::move(source_generator)), max_readahead(max_readahead) {} void MarkFinishedIfDone(const T& next_result) { + // ASSERT_HELD(mu) if (IsIterationEnd(next_result)) { - finished.store(true); + finished = true; } } AsyncGenerator source_generator; int max_readahead; Future<> final_future = Future<>::Make(); - std::atomic num_running{0}; - std::atomic finished{false}; + int num_running{0}; // GUARDED_BY(mu) + bool finished{false}; // GUARDED_BY(mu) + std::mutex mu; std::queue> readahead_queue; }; From f72f1646119b55f6c8af66bc8d4a9a1130fe416d Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 27 Mar 2025 22:20:00 +0800 Subject: [PATCH 2/6] Turns std::mutex to arrow/util/mutex --- cpp/src/arrow/util/async_generator.h | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 1312617ced5..d4d78061d48 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -751,7 +750,7 @@ class ReadaheadGenerator { [state](const T& result) -> Future { bool mark_finished = false; { - std::unique_lock lock(state->mu); + auto guard = state->mu.Lock(); state->MarkFinishedIfDone(result); --state->num_running; if (state->finished) { @@ -768,7 +767,7 @@ class ReadaheadGenerator { // tasks finish before we return the error. bool mark_finished = false; { - std::lock_guard lock(state->mu); + auto guard = state->mu.Lock(); state->finished = true; --state->num_running; mark_finished = state->num_running == 0; @@ -784,7 +783,7 @@ class ReadaheadGenerator { if (state_->readahead_queue.empty()) { // This is the first request, let's pump the underlying queue { - std::lock_guard lock(state_->mu); + auto guard = state_->mu.Lock(); state_->num_running = state_->max_readahead; } for (int i = 0; i < state_->max_readahead; i++) { @@ -796,13 +795,13 @@ class ReadaheadGenerator { // Pop one and add one auto result = std::move(state_->readahead_queue.front()); state_->readahead_queue.pop(); - std::unique_lock lock(state_->mu); + auto guard = state_->mu.Lock(); if (state_->finished) { - lock.unlock(); + guard.Unlock(); state_->readahead_queue.push(AsyncGeneratorEnd()); } else { ++state_->num_running; - lock.unlock(); + guard.Unlock(); auto back_of_queue = state_->source_generator(); auto back_of_queue_after_check = AddMarkFinishedContinuation(std::move(back_of_queue)); @@ -828,7 +827,7 @@ class ReadaheadGenerator { Future<> final_future = Future<>::Make(); int num_running{0}; // GUARDED_BY(mu) bool finished{false}; // GUARDED_BY(mu) - std::mutex mu; + arrow::util::Mutex mu; std::queue> readahead_queue; }; @@ -1432,9 +1431,8 @@ class MergedGenerator { immediate_inner(next_item.result()); if (immediate_inner.was_empty) { Future> next_source = state->PullSource(); - if (next_source.TryAddCallback([this] { - return OuterCallback{state, index}; - })) { + if (next_source.TryAddCallback( + [this] { return OuterCallback{state, index}; })) { // We hit an unfinished future so we can stop looping return; } From 740b1cfebae3db46247acbe5415ab52d6404e850 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 27 Mar 2025 22:32:14 +0800 Subject: [PATCH 3/6] revert clang-format 18 change --- cpp/src/arrow/util/async_generator.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index d4d78061d48..6490ebf5748 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1431,8 +1431,9 @@ class MergedGenerator { immediate_inner(next_item.result()); if (immediate_inner.was_empty) { Future> next_source = state->PullSource(); - if (next_source.TryAddCallback( - [this] { return OuterCallback{state, index}; })) { + if (next_source.TryAddCallback([this] { + return OuterCallback{state, index}; + })) { // We hit an unfinished future so we can stop looping return; } From bfcfccedf27afb5da6ec07f78487c92c0d189ec6 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 28 Mar 2025 00:53:38 +0800 Subject: [PATCH 4/6] Apply suggestions --- cpp/src/arrow/util/async_generator.h | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 6490ebf5748..f823769a2d4 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -750,7 +750,7 @@ class ReadaheadGenerator { [state](const T& result) -> Future { bool mark_finished = false; { - auto guard = state->mu.Lock(); + auto guard = state->mutex.Lock(); state->MarkFinishedIfDone(result); --state->num_running; if (state->finished) { @@ -767,7 +767,7 @@ class ReadaheadGenerator { // tasks finish before we return the error. bool mark_finished = false; { - auto guard = state->mu.Lock(); + auto guard = state->mutex.Lock(); state->finished = true; --state->num_running; mark_finished = state->num_running == 0; @@ -783,7 +783,7 @@ class ReadaheadGenerator { if (state_->readahead_queue.empty()) { // This is the first request, let's pump the underlying queue { - auto guard = state_->mu.Lock(); + auto guard = state_->mutex.Lock(); state_->num_running = state_->max_readahead; } for (int i = 0; i < state_->max_readahead; i++) { @@ -795,7 +795,7 @@ class ReadaheadGenerator { // Pop one and add one auto result = std::move(state_->readahead_queue.front()); state_->readahead_queue.pop(); - auto guard = state_->mu.Lock(); + auto guard = state_->mutex.Lock(); if (state_->finished) { guard.Unlock(); state_->readahead_queue.push(AsyncGeneratorEnd()); @@ -816,7 +816,7 @@ class ReadaheadGenerator { : source_generator(std::move(source_generator)), max_readahead(max_readahead) {} void MarkFinishedIfDone(const T& next_result) { - // ASSERT_HELD(mu) + // ASSERT_HELD(mutex) if (IsIterationEnd(next_result)) { finished = true; } @@ -825,9 +825,9 @@ class ReadaheadGenerator { AsyncGenerator source_generator; int max_readahead; Future<> final_future = Future<>::Make(); - int num_running{0}; // GUARDED_BY(mu) - bool finished{false}; // GUARDED_BY(mu) - arrow::util::Mutex mu; + int num_running{0}; // GUARDED_BY(mutex) + bool finished{false}; // GUARDED_BY(mutex) + arrow::util::Mutex mutex; std::queue> readahead_queue; }; From c5a556d41a147965b6870a21ef87164d0d3366eb Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 31 Mar 2025 16:06:47 +0800 Subject: [PATCH 5/6] apply suggestion --- cpp/src/arrow/util/async_generator.h | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index f823769a2d4..d48553b68a5 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -795,13 +795,17 @@ class ReadaheadGenerator { // Pop one and add one auto result = std::move(state_->readahead_queue.front()); state_->readahead_queue.pop(); - auto guard = state_->mutex.Lock(); - if (state_->finished) { - guard.Unlock(); + bool is_finished = false; + { + auto guard = state_->mutex.Lock(); + is_finished = state_->finished; + if (!is_finished) { + ++state_->num_running; + } + } + if (is_finished) { state_->readahead_queue.push(AsyncGeneratorEnd()); } else { - ++state_->num_running; - guard.Unlock(); auto back_of_queue = state_->source_generator(); auto back_of_queue_after_check = AddMarkFinishedContinuation(std::move(back_of_queue)); From 30c7aa0b987e66b32fb8aace4d2cae12a31251bf Mon Sep 17 00:00:00 2001 From: mwish Date: Mon, 31 Mar 2025 22:23:10 +0800 Subject: [PATCH 6/6] Apply suggestions --- cpp/src/arrow/util/async_generator.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index d48553b68a5..57c6d9b5dde 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -784,6 +784,8 @@ class ReadaheadGenerator { // This is the first request, let's pump the underlying queue { auto guard = state_->mutex.Lock(); + // We're going to push to the queue below, but we need + // to update `num_running` while we're holding the lock. state_->num_running = state_->max_readahead; } for (int i = 0; i < state_->max_readahead; i++) { @@ -800,6 +802,8 @@ class ReadaheadGenerator { auto guard = state_->mutex.Lock(); is_finished = state_->finished; if (!is_finished) { + // We're going to push to the queue below, but we need + // to update `num_running` while we're holding the lock. ++state_->num_running; } }