From c169e3455fbc686975a84f19960caccb20e5213d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 26 Mar 2021 02:55:33 -1000 Subject: [PATCH] ARROW-12097: Working on a new background generator that does not spawn a thread task per execution ARROW-12097: WIP on new background generator ARROW-12097: Forgot to switch the async file reader over to the new background reader ARROW-12097: Added check to make sure we are handling invalid result from spawn. Added some test cases ARROW-12097: Removing leftover comment ARROW-12097: Fixed another spot where we were ignoring a Status return value ARROW-12097: Subtle race condition hidden by using Future<>.is_valid when it was better to just use an optional ARROW-12097: The BadResult test could fail under certain timing conditions. I needed to loosen up how the check interpreted success. --- cpp/src/arrow/csv/reader.cc | 21 +- cpp/src/arrow/util/async_generator.h | 182 +++++++++--- cpp/src/arrow/util/async_generator_test.cc | 327 +++++++++++++++++---- cpp/src/arrow/util/iterator.h | 5 + cpp/src/arrow/util/test_common.h | 14 + 5 files changed, 441 insertions(+), 108 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 4eaed420e7f2..0e86df26ad82 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -703,14 +703,11 @@ class SerialStreamingReader : public BaseStreamingReader, ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); + // TODO Consider exposing readahead as a read option (ARROW-12090) ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); - // TODO Consider exposing readahead as a read option (ARROW-12090) - auto rh_it = - MakeSerialReadaheadGenerator(std::move(bg_it), cpu_executor_->GetCapacity()); - - auto transferred_it = MakeTransferredGenerator(rh_it, cpu_executor_); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); @@ -909,15 +906,15 @@ class AsyncThreadedTableReader ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); - ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), - io_context_.executor())); + int max_readahead = cpu_executor_->GetCapacity(); + int readahead_restart = std::max(1, max_readahead / 2); - auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + ARROW_ASSIGN_OR_RAISE( + auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor(), + max_readahead, readahead_restart)); - int32_t block_queue_size = cpu_executor_->GetCapacity(); - auto rh_it = - MakeSerialReadaheadGenerator(std::move(transferred_it), block_queue_size); - buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it)); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); return Status::OK(); } diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index fc58c3d180bf..2a7ff46c4d21 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1096,65 +1096,158 @@ AsyncGenerator MakeIteratorGenerator(Iterator it) { template class BackgroundGenerator { public: - explicit BackgroundGenerator(Iterator it, internal::Executor* io_executor) - : io_executor_(io_executor) { - task_ = Task{std::make_shared>(std::move(it)), - std::make_shared>(false)}; - } - - ~BackgroundGenerator() { - // The thread pool will be disposed of automatically. By default it will not wait - // so the background thread may outlive this object. That should be ok. Any task - // objects in the thread pool are copies of task_ and have their own shared_ptr to - // the iterator. - } + explicit BackgroundGenerator(Iterator it, internal::Executor* io_executor, int max_q, + int q_restart) + : state_(std::make_shared(io_executor, std::move(it), max_q, q_restart)) {} - ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundGenerator); - ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundGenerator); + ~BackgroundGenerator() {} Future operator()() { - auto submitted_future = io_executor_->Submit(task_); - if (!submitted_future.ok()) { - return Future::MakeFinished(submitted_future.status()); + auto guard = state_->mutex.Lock(); + Future waiting_future; + if (state_->queue.empty()) { + if (state_->finished) { + return AsyncGeneratorEnd(); + } else { + waiting_future = Future::Make(); + state_->waiting_future = waiting_future; + } + } else { + auto next = Future::MakeFinished(std::move(state_->queue.front())); + state_->queue.pop(); + if (!state_->running && + static_cast(state_->queue.size()) <= state_->q_restart) { + state_->RestartTask(state_, std::move(guard)); + } + return next; + } + if (!state_->running) { + // This branch should only be needed to start the background thread on the first + // call + state_->RestartTask(state_, std::move(guard)); } - return std::move(*submitted_future); + return waiting_future; } protected: - struct Task { - Result operator()() { - if (*done_) { - return IterationTraits::End(); + struct State { + State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) + : io_executor(io_executor), + it(std::move(it)), + running(false), + finished(false), + max_q(max_q), + q_restart(q_restart) {} + + void ClearQueue() { + while (!queue.empty()) { + queue.pop(); } - auto next = it_->Next(); - if (!next.ok() || IsIterationEnd(*next)) { - *done_ = true; + } + + void RestartTask(std::shared_ptr state, util::Mutex::Guard guard) { + if (!finished) { + running = true; + auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); }); + if (!spawn_status.ok()) { + running = false; + finished = true; + if (waiting_future.has_value()) { + auto to_deliver = std::move(waiting_future.value()); + waiting_future.reset(); + guard.Unlock(); + to_deliver.MarkFinished(spawn_status); + } else { + ClearQueue(); + queue.push(spawn_status); + } + } } - return next; } - // This task is going to be copied so we need to convert the iterator ptr to - // a shared ptr. This should be safe however because the background executor only - // has a single thread so it can't access it_ across multiple threads. - std::shared_ptr> it_; - std::shared_ptr> done_; + + internal::Executor* io_executor; + Iterator it; + bool running; + bool finished; + int max_q; + int q_restart; + std::queue> queue; + util::optional> waiting_future; + util::Mutex mutex; }; - Task task_; - internal::Executor* io_executor_; + class Task { + public: + void operator()(std::shared_ptr state) { + // while condition can't be based on state_ because it is run outside the mutex + bool running = true; + while (running) { + auto next = state->it.Next(); + // Need to capture state->waiting_future inside the mutex to mark finished outside + Future waiting_future; + { + auto guard = state->mutex.Lock(); + + if (!next.ok() || IsIterationEnd(*next)) { + state->finished = true; + state->running = false; + if (!next.ok()) { + state->ClearQueue(); + } + } + if (state->waiting_future.has_value()) { + waiting_future = std::move(state->waiting_future.value()); + state->waiting_future.reset(); + } else { + state->queue.push(std::move(next)); + if (static_cast(state->queue.size()) >= state->max_q) { + state->running = false; + } + } + running = state->running; + } + // This must happen outside the task. Although presumably there is a transferring + // generator on the other end that will quickly transfer any callbacks off of this + // thread so we can continue looping. Still, best not to rely on that + if (waiting_future.is_valid()) { + waiting_future.MarkFinished(next); + } + } + } + }; + + std::shared_ptr state_; }; +constexpr int kDefaultBackgroundMaxQ = 32; +constexpr int kDefaultBackgroundQRestart = 16; + /// \brief Creates an AsyncGenerator by iterating over an Iterator on a background /// thread /// -/// This generator is async-reentrant +/// The parameter max_q and q_restart control queue size and background thread task +/// management. If the background task is fast you typically don't want it creating a +/// thread task for every item. Instead the background thread will run until it fills +/// up a readahead queue. /// -/// This generator will not queue +/// Once the queue has filled up the background thread task will terminate (allowing other +/// I/O tasks to use the thread). Once the queue has been drained enough (specified by +/// q_restart) then the background thread task will be restarted. If q_restart is too low +/// then you may exhaust the queue waiting for the background thread task to start running +/// again. If it is too high then it will be constantly stopping and restarting the +/// background queue task +/// +/// This generator is not async-reentrant +/// +/// This generator will queue up to max_q blocks template static Result> MakeBackgroundGenerator( - Iterator iterator, internal::Executor* io_executor) { - auto background_iterator = std::make_shared>( - std::move(iterator), std::move(io_executor)); - return [background_iterator]() { return (*background_iterator)(); }; + Iterator iterator, internal::Executor* io_executor, + int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) { + if (max_q < q_restart) { + return Status::Invalid("max_q must be >= q_restart"); + } + return BackgroundGenerator(std::move(iterator), io_executor, max_q, q_restart); } /// \see MakeGeneratorIterator @@ -1185,16 +1278,17 @@ Result> MakeGeneratorIterator(AsyncGenerator source) { template Result> MakeReadaheadIterator(Iterator it, int readahead_queue_size) { ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1)); - ARROW_ASSIGN_OR_RAISE(auto background_generator, - MakeBackgroundGenerator(std::move(it), io_executor.get())); + auto max_q = readahead_queue_size; + auto q_restart = std::max(1, max_q / 2); + ARROW_ASSIGN_OR_RAISE( + auto background_generator, + MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart)); // Capture io_executor to keep it alive as long as owned_bg_generator is still // referenced AsyncGenerator owned_bg_generator = [io_executor, background_generator]() { return background_generator(); }; - auto readahead_generator = - MakeReadaheadGenerator(std::move(owned_bg_generator), readahead_queue_size); - return MakeGeneratorIterator(std::move(readahead_generator)); + return MakeGeneratorIterator(std::move(owned_bg_generator)); } } // namespace arrow diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index a2e87824bf46..518422de5866 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -81,8 +81,20 @@ class TrackingGenerator { }; // Yields items with a small pause between each one from a background thread -std::function()> BackgroundAsyncVectorIt(std::vector v, - bool sleep = true) { +std::function()> BackgroundAsyncVectorIt( + std::vector v, bool sleep = true, int max_q = kDefaultBackgroundMaxQ, + int q_restart = kDefaultBackgroundQRestart) { + auto pool = internal::GetCpuThreadPool(); + auto slow_iterator = PossiblySlowVectorIt(v, sleep); + EXPECT_OK_AND_ASSIGN( + auto background, + MakeBackgroundGenerator(std::move(slow_iterator), + internal::GetCpuThreadPool(), max_q, q_restart)); + return MakeTransferredGenerator(background, pool); +} + +std::function()> NewBackgroundAsyncVectorIt(std::vector v, + bool sleep = true) { auto pool = internal::GetCpuThreadPool(); auto iterator = VectorIt(v); auto slow_iterator = MakeTransformedIterator( @@ -205,6 +217,115 @@ ReentrantCheckerGuard ExpectNotAccessedReentrantly(AsyncGenerator* generat return ReentrantCheckerGuard(reentrant_checker); } +class GeneratorTestFixture : public ::testing::TestWithParam { + protected: + AsyncGenerator MakeSource(const std::vector& items) { + std::vector wrapped(items.begin(), items.end()); + auto gen = AsyncVectorIt(std::move(wrapped)); + bool slow = GetParam(); + if (slow) { + return SlowdownABit(std::move(gen)); + } + return gen; + } + + AsyncGenerator MakeFailingSource() { + AsyncGenerator gen = [] { + return Future::MakeFinished(Status::Invalid("XYZ")); + }; + bool slow = GetParam(); + if (slow) { + return SlowdownABit(std::move(gen)); + } + return gen; + } + + int GetNumItersForStress() { + bool slow = GetParam(); + // Run fewer trials for the slow case since they take longer + if (slow) { + return 10; + } else { + return 100; + } + } +}; + +template +class ManualIteratorControl { + public: + virtual ~ManualIteratorControl() {} + virtual void Push(Result result) = 0; + virtual uint32_t times_polled() = 0; +}; + +template +class PushIterator : public ManualIteratorControl { + public: + PushIterator() : state_(std::make_shared()) {} + virtual ~PushIterator() {} + + Result Next() { + std::unique_lock lk(state_->mx); + state_->times_polled++; + if (!state_->cv.wait_for(lk, std::chrono::seconds(300), + [&] { return !state_->items.empty(); })) { + return Status::Invalid("Timed out waiting for PushIterator"); + } + auto next = std::move(state_->items.front()); + state_->items.pop(); + return next; + } + + void Push(Result result) override { + { + std::lock_guard lg(state_->mx); + state_->items.push(std::move(result)); + } + state_->cv.notify_one(); + } + + uint32_t times_polled() override { + std::lock_guard lg(state_->mx); + return state_->times_polled; + } + + private: + struct State { + uint32_t times_polled = 0; + std::mutex mx; + std::condition_variable cv; + std::queue> items; + }; + + std::shared_ptr state_; +}; + +template +Iterator MakePushIterator(std::shared_ptr>* out) { + auto iter = std::make_shared>(); + *out = iter; + return Iterator(*iter); +} + +template +class ManualGenerator { + public: + ManualGenerator() : times_polled_(std::make_shared()) {} + + Future operator()() { + (*times_polled_)++; + return source_(); + } + + uint32_t times_polled() const { return *times_polled_; } + typename PushGenerator::Producer producer() { return source_.producer(); } + + private: + PushGenerator source_; + std::shared_ptr times_polled_; +}; + TEST(TestAsyncUtil, Visit) { auto generator = AsyncVectorIt({1, 2, 3}); unsigned int sum = 0; @@ -340,41 +461,9 @@ TEST(TestAsyncUtil, Concatenated) { AssertAsyncGeneratorMatch(expected, concat); } -class GeneratorTestFixture : public ::testing::TestWithParam { - protected: - AsyncGenerator MakeSource(const std::vector& items) { - std::vector wrapped(items.begin(), items.end()); - auto gen = AsyncVectorIt(std::move(wrapped)); - bool slow = GetParam(); - if (slow) { - return SlowdownABit(std::move(gen)); - } - return gen; - } +class MergedGeneratorTestFixture : public GeneratorTestFixture {}; - AsyncGenerator MakeFailingSource() { - AsyncGenerator gen = [] { - return Future::MakeFinished(Status::Invalid("XYZ")); - }; - bool slow = GetParam(); - if (slow) { - return SlowdownABit(std::move(gen)); - } - return gen; - } - - int GetNumItersForStress() { - bool slow = GetParam(); - // Run fewer trials for the slow case since they take longer - if (slow) { - return 10; - } else { - return 100; - } - } -}; - -TEST_P(GeneratorTestFixture, Merged) { +TEST_P(MergedGeneratorTestFixture, Merged) { auto gen = AsyncVectorIt>( {MakeSource({1, 2, 3}), MakeSource({4, 5, 6})}); @@ -388,14 +477,14 @@ TEST_P(GeneratorTestFixture, Merged) { ASSERT_EQ(expected, concat_set); } -TEST_P(GeneratorTestFixture, MergedInnerFail) { +TEST_P(MergedGeneratorTestFixture, MergedInnerFail) { auto gen = AsyncVectorIt>( {MakeSource({1, 2, 3}), MakeFailingSource()}); auto merged_gen = MakeMergedGenerator(gen, 10); ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen)); } -TEST_P(GeneratorTestFixture, MergedOuterFail) { +TEST_P(MergedGeneratorTestFixture, MergedOuterFail) { auto gen = FailsAt(AsyncVectorIt>( {MakeSource({1, 2, 3}), MakeSource({1, 2, 3}), MakeSource({1, 2, 3})}), @@ -404,7 +493,7 @@ TEST_P(GeneratorTestFixture, MergedOuterFail) { ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen)); } -TEST_P(GeneratorTestFixture, MergedLimitedSubscriptions) { +TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) { auto gen = AsyncVectorIt>( {MakeSource({1, 2}), MakeSource({3, 4}), MakeSource({5, 6, 7, 8}), MakeSource({9, 10, 11, 12})}); @@ -445,7 +534,7 @@ TEST_P(GeneratorTestFixture, MergedLimitedSubscriptions) { AssertGeneratorExhausted(merged); } -TEST_P(GeneratorTestFixture, MergedStress) { +TEST_P(MergedGeneratorTestFixture, MergedStress) { constexpr int NGENERATORS = 10; constexpr int NITEMS = 10; for (int i = 0; i < GetNumItersForStress(); i++) { @@ -464,7 +553,7 @@ TEST_P(GeneratorTestFixture, MergedStress) { } } -TEST_P(GeneratorTestFixture, MergedParallelStress) { +TEST_P(MergedGeneratorTestFixture, MergedParallelStress) { constexpr int NGENERATORS = 10; constexpr int NITEMS = 10; for (int i = 0; i < GetNumItersForStress(); i++) { @@ -479,7 +568,7 @@ TEST_P(GeneratorTestFixture, MergedParallelStress) { } } -INSTANTIATE_TEST_SUITE_P(GeneratorTests, GeneratorTestFixture, +INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, GeneratorTestFixture, ::testing::Values(false, true)); TEST(TestAsyncUtil, FromVector) { @@ -570,14 +659,138 @@ TEST(TestAsyncUtil, StackOverflow) { #endif -TEST(TestAsyncUtil, Background) { +class BackgroundGeneratorTestFixture : public GeneratorTestFixture { + protected: + AsyncGenerator Make(const std::vector& it, + int max_q = kDefaultBackgroundMaxQ, + int q_restart = kDefaultBackgroundQRestart) { + bool slow = GetParam(); + return BackgroundAsyncVectorIt(it, slow, max_q, q_restart); + } +}; + +TEST_P(BackgroundGeneratorTestFixture, Empty) { + auto background = Make({}); + AssertGeneratorExhausted(background); +} + +TEST_P(BackgroundGeneratorTestFixture, Basic) { std::vector expected = {1, 2, 3}; - auto background = BackgroundAsyncVectorIt(expected); + auto background = Make(expected); auto future = CollectAsyncGenerator(background); ASSERT_FINISHES_OK_AND_ASSIGN(auto collected, future); ASSERT_EQ(expected, collected); } +TEST_P(BackgroundGeneratorTestFixture, BadResult) { + std::shared_ptr> iterator_control; + auto iterator = MakePushIterator(&iterator_control); + // Enough valid items to fill the queue and then some + for (int i = 0; i < 5; i++) { + iterator_control->Push(i); + } + // Next fail + iterator_control->Push(Status::Invalid("XYZ")); + ASSERT_OK_AND_ASSIGN( + auto generator, + MakeBackgroundGenerator(std::move(iterator), internal::GetCpuThreadPool(), 4, 2)); + + ASSERT_FINISHES_OK_AND_EQ(TestInt(0), generator()); + // Have not yet restarted so next results should always be valid + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), generator()); + // Next three results may or may not be valid. + // The typical case is the call for TestInt(2) restarts a full queue and then maybe + // TestInt(3) and TestInt(4) arrive quickly enough to not get pre-empted or maybe + // they don't. + // + // A more bizarre, but possible, case is the checking thread falls behind the producer + // thread just so and TestInt(1) arrives and is delivered but before the call for + // TestInt(2) happens the background reader reads 2, 3, 4, and 5[err] into the queue so + // the queue never fills up and even TestInt(2) is preempted. + bool invalid_encountered = false; + for (int i = 0; i < 3; i++) { + auto next_fut = generator(); + auto next_result = next_fut.result(); + if (next_result.ok()) { + ASSERT_EQ(TestInt(i + 2), next_result.ValueUnsafe()); + } else { + invalid_encountered = true; + break; + } + } + // If both of the next two results are valid then this one will surely be invalid + if (!invalid_encountered) { + ASSERT_FINISHES_AND_RAISES(Invalid, generator()); + } + AssertGeneratorExhausted(generator); +} + +TEST_P(BackgroundGeneratorTestFixture, InvalidExecutor) { + std::vector expected = {1, 2, 3, 4, 5, 6, 7, 8}; + // Case 1: waiting future + auto slow = GetParam(); + auto it = PossiblySlowVectorIt(expected, slow); + ASSERT_OK_AND_ASSIGN(auto invalid_executor, internal::ThreadPool::Make(1)); + ASSERT_OK(invalid_executor->Shutdown()); + ASSERT_OK_AND_ASSIGN(auto background, MakeBackgroundGenerator( + std::move(it), invalid_executor.get(), 4, 2)); + ASSERT_FINISHES_AND_RAISES(Invalid, background()); + + // Case 2: Queue bad result + it = PossiblySlowVectorIt(expected, slow); + ASSERT_OK_AND_ASSIGN(invalid_executor, internal::ThreadPool::Make(1)); + ASSERT_OK_AND_ASSIGN( + background, MakeBackgroundGenerator(std::move(it), invalid_executor.get(), 4, 2)); + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), background()); + ASSERT_OK(invalid_executor->Shutdown()); + // Next two are ok because queue is shutdown + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), background()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(3), background()); + // Now the queue should have tried (and failed) to start back up + ASSERT_FINISHES_AND_RAISES(Invalid, background()); +} + +TEST_P(BackgroundGeneratorTestFixture, StopAndRestart) { + std::shared_ptr> iterator_control; + auto iterator = MakePushIterator(&iterator_control); + // Start with 6 items in the source + for (int i = 0; i < 6; i++) { + iterator_control->Push(i); + } + iterator_control->Push(IterationEnd()); + + ASSERT_OK_AND_ASSIGN( + auto generator, + MakeBackgroundGenerator(std::move(iterator), internal::GetCpuThreadPool(), 4, 2)); + SleepABit(); + // Lazy, should not start until polled once + ASSERT_EQ(iterator_control->times_polled(), 0); + // First poll should trigger 5 reads (1 for the polled value, 4 for the queue) + auto next = generator(); + BusyWait(10, [&] { return iterator_control->times_polled() >= 5; }); + // And then stop and not read any more + SleepABit(); + ASSERT_EQ(iterator_control->times_polled(), 5); + + ASSERT_FINISHES_OK_AND_EQ(TestInt(0), next); + // One more read should bring q down to 3 and should not restart + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), generator()); + SleepABit(); + ASSERT_EQ(iterator_control->times_polled(), 5); + + // One more read should bring q down to 2 and that should restart + // but it will only read up to 6 because we hit end of stream + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), generator()); + BusyWait(10, [&] { return iterator_control->times_polled() >= 7; }); + ASSERT_EQ(iterator_control->times_polled(), 7); + + for (int i = 3; i < 6; i++) { + ASSERT_FINISHES_OK_AND_EQ(TestInt(i), generator()); + } + + AssertGeneratorExhausted(generator); +} + struct SlowEmptyIterator { Result Next() { if (called_) { @@ -591,12 +804,18 @@ struct SlowEmptyIterator { bool called_ = false; }; -TEST(TestAsyncUtil, BackgroundRepeatEnd) { +TEST_P(BackgroundGeneratorTestFixture, BackgroundRepeatEnd) { // Ensure that the background generator properly fulfills the asyncgenerator contract // and can be called after it ends. ASSERT_OK_AND_ASSIGN(auto io_pool, internal::ThreadPool::Make(1)); - auto iterator = Iterator(SlowEmptyIterator()); + bool slow = GetParam(); + Iterator iterator; + if (slow) { + iterator = Iterator(SlowEmptyIterator()); + } else { + iterator = MakeEmptyIterator(); + } ASSERT_OK_AND_ASSIGN(auto background_gen, MakeBackgroundGenerator(std::move(iterator), io_pool.get())); @@ -604,20 +823,21 @@ TEST(TestAsyncUtil, BackgroundRepeatEnd) { MakeTransferredGenerator(std::move(background_gen), internal::GetCpuThreadPool()); auto one = background_gen(); - auto two = background_gen(); - ASSERT_FINISHES_OK_AND_ASSIGN(auto one_fin, one); ASSERT_TRUE(IsIterationEnd(one_fin)); + auto two = background_gen(); ASSERT_FINISHES_OK_AND_ASSIGN(auto two_fin, two); ASSERT_TRUE(IsIterationEnd(two_fin)); } -TEST(TestAsyncUtil, CompleteBackgroundStressTest) { - auto expected = RangeVector(20); +TEST_P(BackgroundGeneratorTestFixture, Stress) { + constexpr int NTASKS = 20; + constexpr int NITEMS = 20; + auto expected = RangeVector(NITEMS); std::vector>> futures; - for (unsigned int i = 0; i < 20; i++) { - auto background = BackgroundAsyncVectorIt(expected); + for (unsigned int i = 0; i < NTASKS; i++) { + auto background = Make(expected, /*max_q=*/4, /*q_restart=*/2); futures.push_back(CollectAsyncGenerator(background)); } auto combined = All(futures); @@ -628,6 +848,9 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) { } } +INSTANTIATE_TEST_SUITE_P(BackgroundGeneratorTests, BackgroundGeneratorTestFixture, + ::testing::Values(false, true)); + TEST(TestAsyncUtil, SerialReadaheadSlowProducer) { AsyncGenerator gen = BackgroundAsyncVectorIt({1, 2, 3, 4, 5}); auto guard = ExpectNotAccessedReentrantly(&gen); diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 568cb1f5cd19..4d9e7b18290d 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -56,6 +56,11 @@ struct IterationTraits { static bool IsEnd(const T& val) { return val == End(); } }; +template +T IterationEnd() { + return IterationTraits::End(); +} + template bool IsIterationEnd(const T& val) { return IterationTraits::IsEnd(val); diff --git a/cpp/src/arrow/util/test_common.h b/cpp/src/arrow/util/test_common.h index 9e811e0a7fbe..8c304ffbbcfa 100644 --- a/cpp/src/arrow/util/test_common.h +++ b/cpp/src/arrow/util/test_common.h @@ -63,6 +63,20 @@ inline Iterator VectorIt(std::vector v) { return MakeVectorIterator(std::move(v)); } +template +inline Iterator PossiblySlowVectorIt(std::vector v, bool slow = false) { + auto iterator = MakeVectorIterator(std::move(v)); + if (slow) { + return MakeTransformedIterator(std::move(iterator), + [](T item) -> Result> { + SleepABit(); + return TransformYield(item); + }); + } else { + return iterator; + } +} + template inline void AssertIteratorExhausted(Iterator& it) { ASSERT_OK_AND_ASSIGN(T next, it.Next());