From e82d4b2ea46feeab1cc18eeea4df33bb90f0751a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 10:37:45 -1000 Subject: [PATCH 1/7] ARROW-12220: The background generator kept reading from the source even after the downstream had given up on it. Other than the obvious memory / resource usage problems this also meant that callback handlers could reference deleted state downstream. Now we block the destructor until the background thread is finished and we stop the background thread early if all consumer references are lost. --- cpp/src/arrow/util/async_generator.h | 55 ++++++++++++++++++++-- cpp/src/arrow/util/async_generator_test.cc | 37 +++++++++++++++ 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index f034cea9983..9388f7c6feb 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1139,9 +1139,8 @@ class BackgroundGenerator { public: explicit BackgroundGenerator(Iterator it, internal::Executor* io_executor, int max_q, int q_restart) - : state_(std::make_shared(io_executor, std::move(it), max_q, q_restart)) {} - - ~BackgroundGenerator() {} + : state_(std::make_shared(io_executor, std::move(it), max_q, q_restart)), + cleanup_(std::make_shared(state_.get())) {} Future operator()() { auto guard = state_->mutex.Lock(); @@ -1175,10 +1174,14 @@ class BackgroundGenerator { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), it(std::move(it)), + started(false), running(false), finished(false), + should_shutdown(false), max_q(max_q), - q_restart(q_restart) {} + q_restart(q_restart) { + task_finished = Future<>::Make(); + } void ClearQueue() { while (!queue.empty()) { @@ -1187,6 +1190,7 @@ class BackgroundGenerator { } void RestartTask(std::shared_ptr state, util::Mutex::Guard guard) { + state->started = true; if (!finished) { running = true; auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); @@ -1202,25 +1206,48 @@ class BackgroundGenerator { ClearQueue(); queue.push(spawn_status); } + task_finished.MarkFinished(); } } } internal::Executor* io_executor; Iterator it; + bool started; bool running; bool finished; + bool should_shutdown; int max_q; int q_restart; std::queue> queue; util::optional> waiting_future; util::Mutex mutex; + Future<> task_finished; + }; + + struct Cleanup { + explicit Cleanup(State* state) : state(state) {} + ~Cleanup() { + Future<> finish_fut; + { + auto lock = state->mutex.Lock(); + if (!state->started) { + return; + } + state->should_shutdown = true; + finish_fut = state->task_finished; + } + // Using future as a condition variable here + Status st = finish_fut.status(); + ARROW_UNUSED(st); + } + State* state; }; class Task { public: void operator()(std::shared_ptr state) { - // while condition can't be based on state_ because it is run outside the mutex + // These conditions can't be based on state_ because they run outside the mutex bool running = true; while (running) { auto next = state->it.Next(); @@ -1229,6 +1256,12 @@ class BackgroundGenerator { { auto guard = state->mutex.Lock(); + if (state->should_shutdown) { + state->finished = true; + state->running = false; + break; + } + if (!next.ok() || IsIterationEnd(*next)) { state->finished = true; state->running = false; @@ -1254,10 +1287,22 @@ class BackgroundGenerator { waiting_future.MarkFinished(next); } } + // It's safe to access this outside the mutex. The only places finished is set to + // true are in this task and in a failure to spawn this task (in which case we are + // not here) + if (state->finished) { + state->task_finished.MarkFinished(); + } } }; std::shared_ptr state_; + // state_ is held by both the generator and the background thread so it won't be cleaned + // up when all consumer references are relinquished. cleanup_ is only held by the + // generator so it will be destructed when the last consumer reference is gone. We use + // this to cleanup / stop the background generator in case the consuming end stops + // listening (e.g. due to a downstream error) + std::shared_ptr cleanup_; }; constexpr int kDefaultBackgroundMaxQ = 32; diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 51e4f948d38..7d475a1f1fb 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -814,6 +814,43 @@ TEST_P(BackgroundGeneratorTestFixture, StopAndRestart) { AssertGeneratorExhausted(generator); } +struct TrackingIterator { + TrackingIterator(bool slow) : token(std::make_shared(false)), slow(slow) {} + + Result Next() { + if (slow) { + SleepABit(); + } + return TestInt(0); + } + std::weak_ptr GetWeakTargetRef() { return std::weak_ptr(token); } + + std::shared_ptr token; + bool slow; +}; + +TEST_P(BackgroundGeneratorTestFixture, AbortReading) { + // If there is an error downstream then it is likely the chain will abort and the + // background generator will lose all references and should abandon reading + TrackingIterator source(IsSlow()); + auto tracker = source.GetWeakTargetRef(); + auto iter = Iterator(std::move(source)); + std::shared_ptr> generator; + { + ASSERT_OK_AND_ASSIGN( + auto gen, MakeBackgroundGenerator(std::move(iter), internal::GetCpuThreadPool())); + generator = std::make_shared>(gen); + } + + // Poll one item to start it up + ASSERT_FINISHES_OK_AND_EQ(TestInt(0), (*generator)()); + ASSERT_FALSE(tracker.expired()); + // Remove last reference to generator, should trigger and wait for cleanup + generator.reset(); + // Cleanup should have ensured no more reference to the source + ASSERT_TRUE(tracker.expired()); +} + struct SlowEmptyIterator { Result Next() { if (called_) { From 7ab6659c2f8afcd52fa882cdcd433d845b5c6afe Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 14:10:05 -1000 Subject: [PATCH 2/7] ARROW-12220: The previous fix didn't properly consider all cases and could lead to deadlock. --- cpp/src/arrow/util/async_generator.h | 49 ++++++++++++---------- cpp/src/arrow/util/async_generator_test.cc | 20 +++++++++ cpp/src/arrow/util/thread_pool.cc | 6 +++ cpp/src/arrow/util/thread_pool.h | 3 ++ 4 files changed, 57 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 9388f7c6feb..6ad13a7d611 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1155,13 +1155,13 @@ class BackgroundGenerator { } else { auto next = Future::MakeFinished(std::move(state_->queue.front())); state_->queue.pop(); - if (!state_->running && + if (!state_->running_in_loop && static_cast(state_->queue.size()) <= state_->q_restart) { state_->RestartTask(state_, std::move(guard)); } return next; } - if (!state_->running) { + if (!state_->running_in_loop) { // This branch should only be needed to start the background thread on the first // call state_->RestartTask(state_, std::move(guard)); @@ -1174,8 +1174,8 @@ class BackgroundGenerator { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), it(std::move(it)), - started(false), - running(false), + running_in_loop(false), + running_at_all(false), finished(false), should_shutdown(false), max_q(max_q), @@ -1190,12 +1190,12 @@ class BackgroundGenerator { } void RestartTask(std::shared_ptr state, util::Mutex::Guard guard) { - state->started = true; + state->running_at_all = true; if (!finished) { - running = true; + running_in_loop = true; auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); if (!spawn_status.ok()) { - running = false; + running_in_loop = false; finished = true; if (waiting_future.has_value()) { auto to_deliver = std::move(waiting_future.value()); @@ -1213,8 +1213,14 @@ class BackgroundGenerator { internal::Executor* io_executor; Iterator it; - bool started; - bool running; + // True if we are still running in the loop and will be adding more items to the + // queue, don't restart the task if this is true. However, even if this is false we + // might still be running some finish callbacks or marking the finish future. + bool running_in_loop; + // If this is false then the background thread is done with everything. It will not + // be running any additional callbacks or marking the finish future. There is no need + // to wait for it when cleaning up. + bool running_at_all; bool finished; bool should_shutdown; int max_q; @@ -1231,7 +1237,7 @@ class BackgroundGenerator { Future<> finish_fut; { auto lock = state->mutex.Lock(); - if (!state->started) { + if (!state->running_at_all) { return; } state->should_shutdown = true; @@ -1248,8 +1254,8 @@ class BackgroundGenerator { public: void operator()(std::shared_ptr state) { // These conditions can't be based on state_ because they run outside the mutex - bool running = true; - while (running) { + bool running_in_loop = true; + while (running_in_loop) { auto next = state->it.Next(); // Need to capture state->waiting_future inside the mutex to mark finished outside Future waiting_future; @@ -1258,13 +1264,13 @@ class BackgroundGenerator { if (state->should_shutdown) { state->finished = true; - state->running = false; + state->running_in_loop = false; break; } if (!next.ok() || IsIterationEnd(*next)) { state->finished = true; - state->running = false; + state->running_in_loop = false; if (!next.ok()) { state->ClearQueue(); } @@ -1275,10 +1281,10 @@ class BackgroundGenerator { } else { state->queue.push(std::move(next)); if (static_cast(state->queue.size()) >= state->max_q) { - state->running = false; + state->running_in_loop = false; } } - running = state->running; + running_in_loop = state->running_in_loop; } // This must happen outside the task. Although presumably there is a transferring // generator on the other end that will quickly transfer any callbacks off of this @@ -1287,11 +1293,12 @@ class BackgroundGenerator { waiting_future.MarkFinished(next); } } - // It's safe to access this outside the mutex. The only places finished is set to - // true are in this task and in a failure to spawn this task (in which case we are - // not here) - if (state->finished) { - state->task_finished.MarkFinished(); + { + auto guard = state->mutex.Lock(); + state->running_at_all = false; + if (state->finished) { + state->task_finished.MarkFinished(); + } } } }; diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 7d475a1f1fb..fe58e442207 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -851,6 +851,26 @@ TEST_P(BackgroundGeneratorTestFixture, AbortReading) { ASSERT_TRUE(tracker.expired()); } +TEST_P(BackgroundGeneratorTestFixture, AbortOnIdleBackground) { + // Tests what happens when the downstream aborts while the background thread is idle + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); + + auto source = PossiblySlowVectorIt(RangeVector(100), IsSlow()); + std::shared_ptr> generator; + { + ASSERT_OK_AND_ASSIGN(auto gen, + MakeBackgroundGenerator(std::move(source), thread_pool.get())); + generator = std::make_shared>(gen); + } + ASSERT_FINISHES_OK_AND_EQ(TestInt(0), (*generator)()); + + // The generator should pretty quickly fill up the queue and idle + BusyWait(10, [&thread_pool] { return thread_pool->GetNumTasks() == 0; }); + + // Now delete the generator and hope we don't deadlock + generator.reset(); +} + struct SlowEmptyIterator { Result Next() { if (called_) { diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 873b9335e74..cd523609d27 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -272,6 +272,12 @@ int ThreadPool::GetCapacity() { return state_->desired_capacity_; } +int ThreadPool::GetNumTasks() { + ProtectAgainstFork(); + std::unique_lock lock(state_->mutex_); + return state_->tasks_queued_or_running_; +} + int ThreadPool::GetActualCapacity() { ProtectAgainstFork(); std::unique_lock lock(state_->mutex_); diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index c4d4d1869c6..cd964385c6e 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -264,6 +264,9 @@ class ARROW_EXPORT ThreadPool : public Executor { // match this value. int GetCapacity() override; + // Return the number of tasks either running or in the queue. + int GetNumTasks(); + // Dynamically change the number of worker threads. // // This function always returns immediately. From 0b7628fab599fb4758cea18619951dc578034377 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 14:21:19 -1000 Subject: [PATCH 3/7] ARROW-12220: Lint --- cpp/src/arrow/util/async_generator_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index fe58e442207..8f2339b7335 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -815,7 +815,8 @@ TEST_P(BackgroundGeneratorTestFixture, StopAndRestart) { } struct TrackingIterator { - TrackingIterator(bool slow) : token(std::make_shared(false)), slow(slow) {} + explicit TrackingIterator(bool slow) + : token(std::make_shared(false)), slow(slow) {} Result Next() { if (slow) { From d04e603dd95871a388fe4d48727f8c847a6fc940 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 14:43:52 -1000 Subject: [PATCH 4/7] ARROW-12220: Need to be a little more lenient in unit tests. Slow PCs could fail to cleanup the background thread quickly enough for the unit test's satisfaction --- cpp/src/arrow/util/async_generator_test.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 8f2339b7335..dfb60ce7a7d 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -848,8 +848,9 @@ TEST_P(BackgroundGeneratorTestFixture, AbortReading) { ASSERT_FALSE(tracker.expired()); // Remove last reference to generator, should trigger and wait for cleanup generator.reset(); - // Cleanup should have ensured no more reference to the source - ASSERT_TRUE(tracker.expired()); + // Cleanup should have ensured no more reference to the source. It may take a moment + // to expire because the background thread has to destruct itself + BusyWait(10, [&tracker] { return tracker.expired(); }); } TEST_P(BackgroundGeneratorTestFixture, AbortOnIdleBackground) { From 9f6d766487af5cf803725ba0ea8e1a4d4adaaec2 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Apr 2021 17:18:43 -1000 Subject: [PATCH 5/7] ARROW-12220: Based on PR feedback I am attempting to simplify things and reduce race conditions --- cpp/src/arrow/util/async_generator.h | 152 +++++++++++++++++---------- 1 file changed, 94 insertions(+), 58 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 6ad13a7d611..49c85eea39f 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1155,33 +1155,29 @@ class BackgroundGenerator { } else { auto next = Future::MakeFinished(std::move(state_->queue.front())); state_->queue.pop(); - if (!state_->running_in_loop && - static_cast(state_->queue.size()) <= state_->q_restart) { - state_->RestartTask(state_, std::move(guard)); + if (state_->NeedsRestart()) { + return state_->RestartTask(state_, std::move(guard), std::move(next)); } return next; } - if (!state_->running_in_loop) { - // This branch should only be needed to start the background thread on the first - // call - state_->RestartTask(state_, std::move(guard)); + // This should only trigger the very first time this method is called + if (state_->NeedsRestart()) { + return state_->RestartTask(state_, std::move(guard), std::move(waiting_future)); } return waiting_future; } protected: + enum class BackgroundThreadState : char { Reading = 0, Quitting = 1, Idle = 2 }; struct State { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), + max_q(max_q), + q_restart(q_restart), it(std::move(it)), - running_in_loop(false), - running_at_all(false), + reading(false), finished(false), - should_shutdown(false), - max_q(max_q), - q_restart(q_restart) { - task_finished = Future<>::Make(); - } + should_shutdown(false) {} void ClearQueue() { while (!queue.empty()) { @@ -1189,57 +1185,89 @@ class BackgroundGenerator { } } - void RestartTask(std::shared_ptr state, util::Mutex::Guard guard) { - state->running_at_all = true; - if (!finished) { - running_in_loop = true; - auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); - if (!spawn_status.ok()) { - running_in_loop = false; - finished = true; - if (waiting_future.has_value()) { - auto to_deliver = std::move(waiting_future.value()); - waiting_future.reset(); - guard.Unlock(); - to_deliver.MarkFinished(spawn_status); - } else { - ClearQueue(); - queue.push(spawn_status); - } - task_finished.MarkFinished(); + bool TaskIsRunning() const { return task_finished.is_valid(); } + + bool NeedsRestart() const { + return !finished && !reading && static_cast(queue.size()) <= q_restart; + } + + void DoRestartTask(std::shared_ptr state, util::Mutex::Guard guard) { + // If we get here we are actually going to start a new task so let's create a + // task_finished future for it + state->task_finished = Future<>::Make(); + state->reading = true; + auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); + if (!spawn_status.ok()) { + // If we can't spawn a new task then send an error to the consumer (either via a + // waiting future or the queue) and mark ourselves finished + state->finished = true; + state->task_finished = Future<>(); + if (waiting_future.has_value()) { + auto to_deliver = std::move(waiting_future.value()); + waiting_future.reset(); + guard.Unlock(); + to_deliver.MarkFinished(spawn_status); + } else { + ClearQueue(); + queue.push(spawn_status); } } } + Future RestartTask(std::shared_ptr state, util::Mutex::Guard guard, + Future next) { + if (TaskIsRunning()) { + // If the task is still cleaning up we need to wait for it to finish before + // restarting. We also want to block the consumer until we've restarted the + // reader to avoid multiple restarts + return task_finished.Then([state, next](...) { + // This may appear dangerous (recursive mutex) but we should be guaranteed the + // outer guard has been released by this point. We know... + // * task_finished is not already finished (it would be invalid in that case) + // * task_finished will not be marked complete until we've given up the mutex + auto guard_ = state->mutex.Lock(); + state->DoRestartTask(state, std::move(guard_)); + return next; + }); + } + // Otherwise we can restart immediately + DoRestartTask(std::move(state), std::move(guard)); + return next; + } + internal::Executor* io_executor; + const int max_q; + const int q_restart; Iterator it; - // True if we are still running in the loop and will be adding more items to the - // queue, don't restart the task if this is true. However, even if this is false we - // might still be running some finish callbacks or marking the finish future. - bool running_in_loop; - // If this is false then the background thread is done with everything. It will not - // be running any additional callbacks or marking the finish future. There is no need - // to wait for it when cleaning up. - bool running_at_all; + + // If true, the task is actively pumping items from the queue and does not need a + // restart + bool reading; + // Set to true when a terminal item arrives bool finished; + // Signal to the background task to end early because consumers have given up on it bool should_shutdown; - int max_q; - int q_restart; + // If the queue is empty then the consumer will create a waiting future and wait for + // it std::queue> queue; util::optional> waiting_future; - util::Mutex mutex; + // Every background task is given a future to complete when it is entirely finished + // processing and ready for the next task to start or for State to be destroyed Future<> task_finished; + util::Mutex mutex; }; + // Cleanup task that will be run when all consumer references to the generator are lost struct Cleanup { explicit Cleanup(State* state) : state(state) {} ~Cleanup() { Future<> finish_fut; { auto lock = state->mutex.Lock(); - if (!state->running_at_all) { + if (!state->TaskIsRunning()) { return; } + // Signal the current task to stop and wait for it to finish state->should_shutdown = true; finish_fut = state->task_finished; } @@ -1253,9 +1281,9 @@ class BackgroundGenerator { class Task { public: void operator()(std::shared_ptr state) { - // These conditions can't be based on state_ because they run outside the mutex - bool running_in_loop = true; - while (running_in_loop) { + // We need to capture the state to read while outside the mutex + bool reading = true; + while (reading) { auto next = state->it.Next(); // Need to capture state->waiting_future inside the mutex to mark finished outside Future waiting_future; @@ -1264,42 +1292,50 @@ class BackgroundGenerator { if (state->should_shutdown) { state->finished = true; - state->running_in_loop = false; break; } if (!next.ok() || IsIterationEnd(*next)) { + // Terminal item. Mark finished to true, send this last item, and quit state->finished = true; - state->running_in_loop = false; if (!next.ok()) { state->ClearQueue(); } } + // At this point we are going to send an item. Either we will add it to the + // queue or deliver it to a waiting future. if (state->waiting_future.has_value()) { waiting_future = std::move(state->waiting_future.value()); state->waiting_future.reset(); } else { state->queue.push(std::move(next)); + // We just filled up the queue so it is time to quit. We may need to notify + // a cleanup task so we transition to Quitting if (static_cast(state->queue.size()) >= state->max_q) { - state->running_in_loop = false; + state->reading = false; } } - running_in_loop = state->running_in_loop; + reading = state->reading && !state->finished; } - // This must happen outside the task. Although presumably there is a transferring - // generator on the other end that will quickly transfer any callbacks off of this - // thread so we can continue looping. Still, best not to rely on that + // This should happen outside the mutex. Presumably there is a + // transferring generator on the other end that will quickly transfer any + // callbacks off of this thread so we can continue looping. Still, best not to + // rely on that if (waiting_future.is_valid()) { waiting_future.MarkFinished(next); } } + // Once we've sent our last item we can notify any waiters that we are done and so + // either state can be cleaned up or a new background task can be started + Future<> task_finished; { auto guard = state->mutex.Lock(); - state->running_at_all = false; - if (state->finished) { - state->task_finished.MarkFinished(); - } + // After we give up the mutex state can be safely deleted. We will no longer + // reference it. We can safely transition to idle now. + task_finished = state->task_finished; + state->task_finished = Future<>(); } + task_finished.MarkFinished(); } }; From 4b493134044d410b7e29c616c7c74abfe4c09cd0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 9 Apr 2021 07:05:03 -1000 Subject: [PATCH 6/7] ARROW-12220: Forgot to remove an enum leftover from a failed approach --- cpp/src/arrow/util/async_generator.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 49c85eea39f..9c97a44fb90 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1168,7 +1168,6 @@ class BackgroundGenerator { } protected: - enum class BackgroundThreadState : char { Reading = 0, Quitting = 1, Idle = 2 }; struct State { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), From 926f799cd16ab234d0f6ae097420e30cd1f92c33 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 12 Apr 2021 11:09:48 -1000 Subject: [PATCH 7/7] ARROW-12220: Moved Task to a static method on BackgroundGenerator --- cpp/src/arrow/util/async_generator.h | 104 +++++++++++++-------------- 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 9c97a44fb90..0f3e9205f0c 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1195,7 +1195,8 @@ class BackgroundGenerator { // task_finished future for it state->task_finished = Future<>::Make(); state->reading = true; - auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); + auto spawn_status = io_executor->Spawn( + [state]() { BackgroundGenerator::WorkerTask(std::move(state)); }); if (!spawn_status.ok()) { // If we can't spawn a new task then send an error to the consumer (either via a // waiting future or the queue) and mark ourselves finished @@ -1277,66 +1278,63 @@ class BackgroundGenerator { State* state; }; - class Task { - public: - void operator()(std::shared_ptr state) { - // We need to capture the state to read while outside the mutex - bool reading = true; - while (reading) { - auto next = state->it.Next(); - // Need to capture state->waiting_future inside the mutex to mark finished outside - Future waiting_future; - { - auto guard = state->mutex.Lock(); + static void WorkerTask(std::shared_ptr state) { + // We need to capture the state to read while outside the mutex + bool reading = true; + while (reading) { + auto next = state->it.Next(); + // Need to capture state->waiting_future inside the mutex to mark finished outside + Future waiting_future; + { + auto guard = state->mutex.Lock(); - if (state->should_shutdown) { - state->finished = true; - break; - } + if (state->should_shutdown) { + state->finished = true; + break; + } - if (!next.ok() || IsIterationEnd(*next)) { - // Terminal item. Mark finished to true, send this last item, and quit - state->finished = true; - if (!next.ok()) { - state->ClearQueue(); - } - } - // At this point we are going to send an item. Either we will add it to the - // queue or deliver it to a waiting future. - if (state->waiting_future.has_value()) { - waiting_future = std::move(state->waiting_future.value()); - state->waiting_future.reset(); - } else { - state->queue.push(std::move(next)); - // We just filled up the queue so it is time to quit. We may need to notify - // a cleanup task so we transition to Quitting - if (static_cast(state->queue.size()) >= state->max_q) { - state->reading = false; - } + if (!next.ok() || IsIterationEnd(*next)) { + // Terminal item. Mark finished to true, send this last item, and quit + state->finished = true; + if (!next.ok()) { + state->ClearQueue(); } - reading = state->reading && !state->finished; } - // This should happen outside the mutex. Presumably there is a - // transferring generator on the other end that will quickly transfer any - // callbacks off of this thread so we can continue looping. Still, best not to - // rely on that - if (waiting_future.is_valid()) { - waiting_future.MarkFinished(next); + // At this point we are going to send an item. Either we will add it to the + // queue or deliver it to a waiting future. + if (state->waiting_future.has_value()) { + waiting_future = std::move(state->waiting_future.value()); + state->waiting_future.reset(); + } else { + state->queue.push(std::move(next)); + // We just filled up the queue so it is time to quit. We may need to notify + // a cleanup task so we transition to Quitting + if (static_cast(state->queue.size()) >= state->max_q) { + state->reading = false; + } } + reading = state->reading && !state->finished; } - // Once we've sent our last item we can notify any waiters that we are done and so - // either state can be cleaned up or a new background task can be started - Future<> task_finished; - { - auto guard = state->mutex.Lock(); - // After we give up the mutex state can be safely deleted. We will no longer - // reference it. We can safely transition to idle now. - task_finished = state->task_finished; - state->task_finished = Future<>(); + // This should happen outside the mutex. Presumably there is a + // transferring generator on the other end that will quickly transfer any + // callbacks off of this thread so we can continue looping. Still, best not to + // rely on that + if (waiting_future.is_valid()) { + waiting_future.MarkFinished(next); } - task_finished.MarkFinished(); } - }; + // Once we've sent our last item we can notify any waiters that we are done and so + // either state can be cleaned up or a new background task can be started + Future<> task_finished; + { + auto guard = state->mutex.Lock(); + // After we give up the mutex state can be safely deleted. We will no longer + // reference it. We can safely transition to idle now. + task_finished = state->task_finished; + state->task_finished = Future<>(); + } + task_finished.MarkFinished(); + } std::shared_ptr state_; // state_ is held by both the generator and the background thread so it won't be cleaned