diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 81dc3e55072..23942ec37da 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -441,7 +441,8 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this ScanBatchesAsync(Executor* executor); Future<> VisitBatchesAsync(std::function visitor, Executor* executor); - Result ScanBatchesUnorderedAsync(Executor* executor); + Result ScanBatchesUnorderedAsync( + Executor* executor, bool sequence_fragments = false); Future> ToTableAsync(Executor* executor); Result GetFragments() const; @@ -587,7 +588,7 @@ Result ToEnumeratedRecordBatch( } Result AsyncScanner::ScanBatchesUnorderedAsync( - Executor* cpu_executor) { + Executor* cpu_executor, bool sequence_fragments) { if (!scan_options_->use_threads) { cpu_executor = nullptr; } @@ -608,7 +609,8 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( RETURN_NOT_OK( compute::Declaration::Sequence( { - {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}}, + {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle, + sequence_fragments}}, {"filter", compute::FilterNodeOptions{scan_options_->filter}}, {"augmented_project", compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, @@ -649,7 +651,8 @@ Result AsyncScanner::ScanBatchesAsync() { Result AsyncScanner::ScanBatchesAsync( Executor* cpu_executor) { - ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor)); + ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync( + cpu_executor, /*sequence_fragments=*/true)); // We need an initial value sentinel, so we use one with fragment.index < 0 auto is_before_any = [](const EnumeratedRecordBatch& batch) { return batch.fragment.index < 0; @@ -1143,6 +1146,7 @@ Result MakeScanNode(compute::ExecPlan* plan, auto scan_options = scan_node_options.scan_options; auto dataset = scan_node_options.dataset; const auto& backpressure_toggle = scan_node_options.backpressure_toggle; + bool require_sequenced_output = scan_node_options.require_sequenced_output; if (!scan_options->use_async) { return Status::NotImplemented("ScanNodes without asynchrony"); @@ -1175,8 +1179,15 @@ Result MakeScanNode(compute::ExecPlan* plan, ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen, FragmentsToBatches(std::move(fragment_gen), scan_options)); - auto merged_batch_gen = - MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); + AsyncGenerator merged_batch_gen; + if (require_sequenced_output) { + ARROW_ASSIGN_OR_RAISE(merged_batch_gen, + MakeSequencedMergedGenerator(std::move(batch_gen_gen), + scan_options->fragment_readahead)); + } else { + merged_batch_gen = + MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); + } auto batch_gen = MakeReadaheadGenerator(std::move(merged_batch_gen), scan_options->fragment_readahead); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 78746068d87..75e9806fb8f 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -421,14 +421,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { public: explicit ScanNodeOptions( std::shared_ptr dataset, std::shared_ptr scan_options, - std::shared_ptr backpressure_toggle = NULLPTR) + std::shared_ptr backpressure_toggle = NULLPTR, + bool require_sequenced_output = false) : dataset(std::move(dataset)), scan_options(std::move(scan_options)), - backpressure_toggle(std::move(backpressure_toggle)) {} + backpressure_toggle(std::move(backpressure_toggle)), + require_sequenced_output(require_sequenced_output) {} std::shared_ptr dataset; std::shared_ptr scan_options; std::shared_ptr backpressure_toggle; + bool require_sequenced_output; }; /// @} diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 40a0e005a3f..83151de2c02 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1027,12 +1027,12 @@ class TestBackpressure : public ::testing::Test { return sum; } - void Finish(AsyncGenerator gen) { + template + void Finish(AsyncGenerator gen) { for (const auto& controlled_fragment : controlled_fragments_) { controlled_fragment->Finish(); } - ASSERT_FINISHES_OK(VisitAsyncGenerator( - gen, [](EnumeratedRecordBatch batch) { return Status::OK(); })); + ASSERT_FINISHES_OK(VisitAsyncGenerator(gen, [](T batch) { return Status::OK(); })); } std::shared_ptr schema_ = schema({field("values", int32())}); @@ -1063,6 +1063,24 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { Finish(std::move(gen)); } +TEST_F(TestBackpressure, ScanBatchesOrdered) { + std::shared_ptr scanner = MakeScanner(); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, + scanner->ScanBatchesAsync()); + // This future never actually finishes because we only emit the first batch so far and + // the scanner delays by one batch. It is enough to start the system pumping though so + // we don't need it to finish. + Future fut = gen(); + + // See note on other test + GetCpuThreadPool()->WaitForIdle(); + // Worst case we read in the entire set of initial batches + ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); + + DeliverAdditionalBatches(); + Finish(std::move(gen)); +} + struct BatchConsumer { explicit BatchConsumer(EnumeratedRecordBatchGenerator generator) : generator(std::move(generator)), next() {} diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index d2a2339f5bc..0948e5537fe 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -111,14 +111,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator generator, Visitor visitor) { return Loop(LoopBody{std::move(generator), std::move(visitor)}); } -/// \brief Waits for an async generator to complete, discarding results. +/// \brief Wait for an async generator to complete, discarding results. template Future<> DiscardAllFromAsyncGenerator(AsyncGenerator generator) { std::function visitor = [](const T&) { return Status::OK(); }; return VisitAsyncGenerator(generator, visitor); } -/// \brief Collects the results of an async generator into a vector +/// \brief Collect the results of an async generator into a vector template Future> CollectAsyncGenerator(AsyncGenerator generator) { auto vec = std::make_shared>(); @@ -258,7 +258,7 @@ class MappingGenerator { std::shared_ptr state_; }; -/// \brief Creates a generator that will apply the map function to each element of +/// \brief Create a generator that will apply the map function to each element of /// source. The map function is not called on the end token. /// /// Note: This function makes a copy of `map` for each item @@ -278,7 +278,7 @@ AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, MapFn return MappingGenerator(std::move(source_generator), MapCallback{std::move(map)}); } -/// \brief Creates a generator that will apply the map function to +/// \brief Create a generator that will apply the map function to /// each element of source. The map function is not called on the end /// token. The result of the map function should be another /// generator; all these generators will then be flattened to produce @@ -417,7 +417,7 @@ class SequencingGenerator { const std::shared_ptr state_; }; -/// \brief Buffers an AsyncGenerator to return values in sequence order ComesAfter +/// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter /// and IsNext determine the sequence order. /// /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b @@ -532,7 +532,7 @@ class TransformingGenerator { std::shared_ptr state_; }; -/// \brief Transforms an async generator using a transformer function returning a new +/// \brief Transform an async generator using a transformer function returning a new /// AsyncGenerator /// /// The transform function here behaves exactly the same as the transform function in @@ -686,7 +686,7 @@ class FutureFirstGenerator { std::shared_ptr state_; }; -/// \brief Transforms a Future> into an AsyncGenerator +/// \brief Transform a Future> into an AsyncGenerator /// that waits for the future to complete as part of the first item. /// /// This generator is not async-reentrant (even if the generator yielded by future is) @@ -697,7 +697,7 @@ AsyncGenerator MakeFromFuture(Future> future) { return FutureFirstGenerator(std::move(future)); } -/// \brief Creates a generator that will pull from the source into a queue. Unlike +/// \brief Create a generator that will pull from the source into a queue. Unlike /// MakeReadaheadGenerator this will not pull reentrantly from the source. /// /// The source generator does not need to be async-reentrant @@ -711,6 +711,35 @@ AsyncGenerator MakeSerialReadaheadGenerator(AsyncGenerator source_generato return SerialReadaheadGenerator(std::move(source_generator), max_readahead); } +/// \brief Create a generator that immediately pulls from the source +/// +/// Typical generators do not pull from their source until they themselves +/// are pulled. This generator does not follow that convention and will call +/// generator() once before it returns. The returned generator will otherwise +/// mirror the source. +/// +/// This generator forwards aysnc-reentrant pressure to the source +/// This generator buffers one item (the first result) until it is delivered. +template +AsyncGenerator MakeAutoStartingGenerator(AsyncGenerator generator) { + struct AutostartGenerator { + Future operator()() { + if (first_future->is_valid()) { + Future result = *first_future; + *first_future = Future(); + return result; + } + return source(); + } + + std::shared_ptr> first_future; + AsyncGenerator source; + }; + + std::shared_ptr> first_future = std::make_shared>(generator()); + return AutostartGenerator{std::move(first_future), std::move(generator)}; +} + /// \see MakeReadaheadGenerator template class ReadaheadGenerator { @@ -919,7 +948,7 @@ class PushGenerator { const std::shared_ptr state_; }; -/// \brief Creates a generator that pulls reentrantly from a source +/// \brief Create a generator that pulls reentrantly from a source /// This generator will pull reentrantly from a source, ensuring that max_readahead /// requests are active at any given time. /// @@ -1137,7 +1166,7 @@ class MergedGenerator { std::shared_ptr state_; }; -/// \brief Creates a generator that takes in a stream of generators and pulls from up to +/// \brief Create a generator that takes in a stream of generators and pulls from up to /// max_subscriptions at a time /// /// Note: This may deliver items out of sequence. For example, items from the third @@ -1156,7 +1185,24 @@ AsyncGenerator MakeMergedGenerator(AsyncGenerator> source, return MergedGenerator(std::move(source), max_subscriptions); } -/// \brief Creates a generator that takes in a stream of generators and pulls from each +template +Result> MakeSequencedMergedGenerator( + AsyncGenerator> source, int max_subscriptions) { + if (max_subscriptions < 0) { + return Status::Invalid("max_subscriptions must be a positive integer"); + } + if (max_subscriptions == 1) { + return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1"); + } + AsyncGenerator> autostarting_source = MakeMappedGenerator( + std::move(source), + [](const AsyncGenerator& sub) { return MakeAutoStartingGenerator(sub); }); + AsyncGenerator> sub_readahead = + MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); + return MakeConcatenatedGenerator(std::move(sub_readahead)); +} + +/// \brief Create a generator that takes in a stream of generators and pulls from each /// one in sequence. /// /// This generator is async-reentrant but will never pull from source reentrantly and @@ -1224,7 +1270,7 @@ class EnumeratingGenerator { std::shared_ptr state_; }; -/// Wraps items from a source generator with positional information +/// Wrap items from a source generator with positional information /// /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be /// processed in a "first-available" fashion and later resequenced which can reduce the @@ -1260,7 +1306,7 @@ class TransferringGenerator { internal::Executor* executor_; }; -/// \brief Transfers a future to an underlying executor. +/// \brief Transfer a future to an underlying executor. /// /// Continuations run on the returned future will be run on the given executor /// if they cannot be run synchronously. @@ -1506,7 +1552,7 @@ class BackgroundGenerator { constexpr int kDefaultBackgroundMaxQ = 32; constexpr int kDefaultBackgroundQRestart = 16; -/// \brief Creates an AsyncGenerator by iterating over an Iterator on a background +/// \brief Create an AsyncGenerator by iterating over an Iterator on a background /// thread /// /// The parameter max_q and q_restart control queue size and background thread task @@ -1554,14 +1600,14 @@ class GeneratorIterator { AsyncGenerator source_; }; -/// \brief Converts an AsyncGenerator to an Iterator by blocking until each future +/// \brief Convert an AsyncGenerator to an Iterator which blocks until each future /// is finished template Iterator MakeGeneratorIterator(AsyncGenerator source) { return Iterator(GeneratorIterator(std::move(source))); } -/// \brief Adds readahead to an iterator using a background thread. +/// \brief Add readahead to an iterator using a background thread. /// /// Under the hood this is converting the iterator to a generator using /// MakeBackgroundGenerator, adding readahead to the converted generator with @@ -1633,7 +1679,7 @@ AsyncGenerator MakeFailingGenerator(const Result& result) { return MakeFailingGenerator(result.status()); } -/// \brief Prepends initial_values onto a generator +/// \brief Prepend initial_values onto a generator /// /// This generator is async-reentrant but will buffer requests and will not /// pull from following_values async-reentrantly. @@ -1659,7 +1705,7 @@ struct CancellableGenerator { StopToken stop_token; }; -/// \brief Allows an async generator to be cancelled +/// \brief Allow an async generator to be cancelled /// /// This generator is async-reentrant template @@ -1698,7 +1744,7 @@ struct PauseableGenerator { std::shared_ptr state_; }; -/// \brief Allows an async generator to be paused +/// \brief Allow an async generator to be paused /// /// This generator is NOT async-reentrant and calling it in an async-reentrant fashion /// may lead to items getting reordered (and potentially truncated if the end token is diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 22f55d5cb20..67c6e201d60 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -668,6 +668,180 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) { INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture, ::testing::Values(false, true)); +class AutoStartingGeneratorTestFixture : public GeneratorTestFixture {}; + +TEST_P(AutoStartingGeneratorTestFixture, Basic) { + AsyncGenerator source = MakeSource({1, 2, 3}); + util::TrackingGenerator tracked(source); + AsyncGenerator gen = + MakeAutoStartingGenerator(static_cast>(tracked)); + ASSERT_EQ(1, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), gen()); + ASSERT_EQ(1, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), gen()); + ASSERT_EQ(2, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(3), gen()); + ASSERT_EQ(3, tracked.num_read()); + AssertGeneratorExhausted(gen); +} + +TEST_P(AutoStartingGeneratorTestFixture, CopySafe) { + AsyncGenerator source = MakeSource({1, 2, 3}); + AsyncGenerator gen = MakeAutoStartingGenerator(std::move(source)); + AsyncGenerator copy = gen; + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), gen()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), copy()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(3), gen()); + AssertGeneratorExhausted(gen); + AssertGeneratorExhausted(copy); +} + +INSTANTIATE_TEST_SUITE_P(AutoStartingGeneratorTests, AutoStartingGeneratorTestFixture, + ::testing::Values(false, true)); + +class SeqMergedGeneratorTestFixture : public ::testing::Test { + protected: + SeqMergedGeneratorTestFixture() : tracked_source_(push_source_) {} + + void BeginCaptureOutput(AsyncGenerator gen) { + finished_ = VisitAsyncGenerator(std::move(gen), [this](TestInt val) { + sink_.push_back(val.value); + return Status::OK(); + }); + } + + void EmitItem(int sub_index, int value) { + EXPECT_LT(sub_index, push_subs_.size()); + push_subs_[sub_index].producer().Push(value); + } + + void EmitErrorItem(int sub_index) { + EXPECT_LT(sub_index, push_subs_.size()); + push_subs_[sub_index].producer().Push(Status::Invalid("XYZ")); + } + + void EmitSub() { + PushGenerator sub; + util::TrackingGenerator tracked_sub(sub); + tracked_subs_.push_back(tracked_sub); + push_subs_.push_back(std::move(sub)); + push_source_.producer().Push(std::move(tracked_sub)); + } + + void EmitErrorSub() { push_source_.producer().Push(Status::Invalid("XYZ")); } + + void FinishSub(int sub_index) { + EXPECT_LT(sub_index, tracked_subs_.size()); + push_subs_[sub_index].producer().Close(); + } + + void FinishSubs() { push_source_.producer().Close(); } + + void AssertFinishedOk() { ASSERT_FINISHES_OK(finished_); } + + void AssertFailed() { ASSERT_FINISHES_AND_RAISES(Invalid, finished_); } + + int NumItemsAskedFor(int sub_index) { + EXPECT_LT(sub_index, tracked_subs_.size()); + return tracked_subs_[sub_index].num_read(); + } + + int NumSubsAskedFor() { return tracked_source_.num_read(); } + + void AssertRead(std::vector values) { + ASSERT_EQ(values.size(), sink_.size()); + for (std::size_t i = 0; i < sink_.size(); i++) { + ASSERT_EQ(values[i], sink_[i]); + } + } + + PushGenerator> push_source_; + std::vector> push_subs_; + std::vector> tracked_subs_; + util::TrackingGenerator> tracked_source_; + Future<> finished_; + std::vector sink_; +}; + +TEST_F(SeqMergedGeneratorTestFixture, Basic) { + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); + // Should not initially ask for anything + ASSERT_EQ(0, NumSubsAskedFor()); + BeginCaptureOutput(gen); + // Should not read ahead async-reentrantly from source + ASSERT_EQ(1, NumSubsAskedFor()); + EmitSub(); + ASSERT_EQ(2, NumSubsAskedFor()); + // Should immediately start polling + ASSERT_EQ(1, NumItemsAskedFor(0)); + EmitSub(); + EmitSub(); + EmitSub(); + EmitSub(); + // Should limit how many subs it reads ahead + ASSERT_EQ(4, NumSubsAskedFor()); + // Should immediately start polling subs even if they aren't yet active + ASSERT_EQ(1, NumItemsAskedFor(1)); + ASSERT_EQ(1, NumItemsAskedFor(2)); + ASSERT_EQ(1, NumItemsAskedFor(3)); + // Items emitted on non-active subs should not be delivered and should not trigger + // further polling on the inactive sub + EmitItem(1, 0); + ASSERT_EQ(1, NumItemsAskedFor(1)); + AssertRead({}); + EmitItem(0, 1); + AssertRead({1}); + ASSERT_EQ(2, NumItemsAskedFor(0)); + EmitItem(0, 2); + AssertRead({1, 2}); + ASSERT_EQ(3, NumItemsAskedFor(0)); + // On finish it should move to the next sub and pull 1 item + FinishSub(0); + ASSERT_EQ(5, NumSubsAskedFor()); + ASSERT_EQ(2, NumItemsAskedFor(1)); + AssertRead({1, 2, 0}); + // Now finish all the subs and make sure an empty sub is ok + FinishSub(1); + FinishSub(2); + FinishSub(3); + FinishSub(4); + ASSERT_EQ(6, NumSubsAskedFor()); + FinishSubs(); + AssertFinishedOk(); +} + +TEST_F(SeqMergedGeneratorTestFixture, ErrorItem) { + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); + BeginCaptureOutput(gen); + EmitSub(); + EmitSub(); + EmitErrorItem(1); + // It will still read from the active sub and won't notice the error until it switches + // to the failing sub + EmitItem(0, 0); + AssertRead({0}); + FinishSub(0); + AssertFailed(); +} + +TEST_F(SeqMergedGeneratorTestFixture, ErrorSub) { + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); + BeginCaptureOutput(gen); + EmitSub(); + EmitErrorSub(); + FinishSub(0); + AssertFailed(); +} + TEST(TestAsyncUtil, FromVector) { AsyncGenerator gen; {