diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index db98243267b..f034cea9983 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -640,6 +640,7 @@ class SerialReadaheadGenerator { std::shared_ptr state_; }; +/// \see MakeFromFuture template class FutureFirstGenerator { public: @@ -669,6 +670,12 @@ class FutureFirstGenerator { std::shared_ptr state_; }; +/// \brief Transforms a Future> into an AsyncGenerator +/// that waits for the future to complete as part of the first item. +/// +/// This generator is not async-reentrant (even if the generator yielded by future is) +/// +/// This generator does not queue template AsyncGenerator MakeFromFuture(Future> future) { return FutureFirstGenerator(std::move(future)); diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 518422de586..51e4f948d38 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -222,8 +222,7 @@ class GeneratorTestFixture : public ::testing::TestWithParam { AsyncGenerator MakeSource(const std::vector& items) { std::vector wrapped(items.begin(), items.end()); auto gen = AsyncVectorIt(std::move(wrapped)); - bool slow = GetParam(); - if (slow) { + if (IsSlow()) { return SlowdownABit(std::move(gen)); } return gen; @@ -233,22 +232,22 @@ class GeneratorTestFixture : public ::testing::TestWithParam { AsyncGenerator gen = [] { return Future::MakeFinished(Status::Invalid("XYZ")); }; - bool slow = GetParam(); - if (slow) { + if (IsSlow()) { return SlowdownABit(std::move(gen)); } return gen; } int GetNumItersForStress() { - bool slow = GetParam(); // Run fewer trials for the slow case since they take longer - if (slow) { + if (IsSlow()) { return 10; } else { return 100; } } + + bool IsSlow() { return GetParam(); } }; template @@ -461,6 +460,30 @@ TEST(TestAsyncUtil, Concatenated) { AssertAsyncGeneratorMatch(expected, concat); } +class FromFutureFixture : public GeneratorTestFixture {}; + +TEST_P(FromFutureFixture, Basic) { + auto source = Future>::MakeFinished(RangeVector(3)); + if (IsSlow()) { + source = SleepABitAsync().Then( + [](...) -> Result> { return RangeVector(3); }); + } + auto slow = IsSlow(); + auto to_gen = source.Then([slow](const std::vector& vec) { + auto vec_gen = MakeVectorGenerator(vec); + if (slow) { + return SlowdownABit(std::move(vec_gen)); + } + return vec_gen; + }); + auto gen = MakeFromFuture(std::move(to_gen)); + auto collected = CollectAsyncGenerator(std::move(gen)); + ASSERT_FINISHES_OK_AND_EQ(RangeVector(3), collected); +} + +INSTANTIATE_TEST_SUITE_P(FromFutureTests, FromFutureFixture, + ::testing::Values(false, true)); + class MergedGeneratorTestFixture : public GeneratorTestFixture {}; TEST_P(MergedGeneratorTestFixture, Merged) {