From f6bd16a792f5fd955ea1c936af547610496cadf6 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 1 Oct 2021 16:43:35 -1000 Subject: [PATCH 1/6] ARROW-14192: Added a merge generator that will still allow some file readahead but won't allow backpressure to explode if scanning in a sequenced fashion. --- cpp/src/arrow/dataset/scanner.cc | 35 ++-- cpp/src/arrow/dataset/scanner.h | 7 +- cpp/src/arrow/util/async_generator.h | 37 +++++ cpp/src/arrow/util/async_generator_test.cc | 176 +++++++++++++++++++++ 4 files changed, 238 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 81dc3e55072..a07154cc040 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -441,7 +441,7 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this ScanBatchesAsync(Executor* executor); Future<> VisitBatchesAsync(std::function visitor, Executor* executor); - Result ScanBatchesUnorderedAsync(Executor* executor); + Result ScanBatchesUnorderedAsync(Executor* executor, bool sequence_fragments = false); Future> ToTableAsync(Executor* executor); Result GetFragments() const; @@ -587,7 +587,7 @@ Result ToEnumeratedRecordBatch( } Result AsyncScanner::ScanBatchesUnorderedAsync( - Executor* cpu_executor) { + Executor* cpu_executor, bool sequence_fragments) { if (!scan_options_->use_threads) { cpu_executor = nullptr; } @@ -605,16 +605,15 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( scan_options_->projection.call()->options.get()) ->field_names; - RETURN_NOT_OK( - compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}}, - {"filter", compute::FilterNodeOptions{scan_options_->filter}}, - {"augmented_project", - compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}}, - }) - .AddToPlan(plan.get())); + RETURN_NOT_OK(compute::Declaration::Sequence( + { + {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle, sequence_fragments}}, + {"filter", compute::FilterNodeOptions{scan_options_->filter}}, + {"augmented_project", + compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, + {"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}}, + }) + .AddToPlan(plan.get())); RETURN_NOT_OK(plan->StartProducing()); @@ -649,7 +648,7 @@ Result AsyncScanner::ScanBatchesAsync() { Result AsyncScanner::ScanBatchesAsync( Executor* cpu_executor) { - ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor)); + ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor, /*sequence_fragments=*/true)); // We need an initial value sentinel, so we use one with fragment.index < 0 auto is_before_any = [](const EnumeratedRecordBatch& batch) { return batch.fragment.index < 0; @@ -1143,6 +1142,7 @@ Result MakeScanNode(compute::ExecPlan* plan, auto scan_options = scan_node_options.scan_options; auto dataset = scan_node_options.dataset; const auto& backpressure_toggle = scan_node_options.backpressure_toggle; + bool require_sequenced_output = scan_node_options.require_sequenced_output; if (!scan_options->use_async) { return Status::NotImplemented("ScanNodes without asynchrony"); @@ -1175,8 +1175,13 @@ Result MakeScanNode(compute::ExecPlan* plan, ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen, FragmentsToBatches(std::move(fragment_gen), scan_options)); - auto merged_batch_gen = - MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); + AsyncGenerator merged_batch_gen; + if (require_sequenced_output) { + ARROW_ASSIGN_OR_RAISE(merged_batch_gen, MakeSequencedMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead)); + } else { + merged_batch_gen = + MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); + } auto batch_gen = MakeReadaheadGenerator(std::move(merged_batch_gen), scan_options->fragment_readahead); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 78746068d87..75e9806fb8f 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -421,14 +421,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { public: explicit ScanNodeOptions( std::shared_ptr dataset, std::shared_ptr scan_options, - std::shared_ptr backpressure_toggle = NULLPTR) + std::shared_ptr backpressure_toggle = NULLPTR, + bool require_sequenced_output = false) : dataset(std::move(dataset)), scan_options(std::move(scan_options)), - backpressure_toggle(std::move(backpressure_toggle)) {} + backpressure_toggle(std::move(backpressure_toggle)), + require_sequenced_output(require_sequenced_output) {} std::shared_ptr dataset; std::shared_ptr scan_options; std::shared_ptr backpressure_toggle; + bool require_sequenced_output; }; /// @} diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index d2a2339f5bc..8c457dd5544 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -711,6 +712,27 @@ AsyncGenerator MakeSerialReadaheadGenerator(AsyncGenerator source_generato return SerialReadaheadGenerator(std::move(source_generator), max_readahead); } +template +AsyncGenerator MakeAutoStartingGenerator(AsyncGenerator generator) { + struct AutostartGenerator { + + Future operator()() { + if (first_future->is_valid()) { + Future result = *first_future; + *first_future = Future(); + return result; + } + return source(); + } + + std::shared_ptr> first_future; + AsyncGenerator source; + }; + + std::shared_ptr> first_future = std::make_shared>(generator()); + return AutostartGenerator{std::move(first_future), std::move(generator)}; +} + /// \see MakeReadaheadGenerator template class ReadaheadGenerator { @@ -1156,6 +1178,21 @@ AsyncGenerator MakeMergedGenerator(AsyncGenerator> source, return MergedGenerator(std::move(source), max_subscriptions); } +template +Result> MakeSequencedMergedGenerator(AsyncGenerator> source, int max_subscriptions) { + if (max_subscriptions < 0) { + return Status::Invalid("max_subscriptions must be a positive integer"); + } + if (max_subscriptions == 1) { + return Status::Invalid("Use MakeConcatenatedGenerator is max_subscriptions is 1"); + } + AsyncGenerator> autostarting_source = MakeMappedGenerator(std::move(source), [] (const AsyncGenerator& sub) { + return MakeAutoStartingGenerator(sub); + }); + AsyncGenerator> sub_readahead = MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); + return MakeConcatenatedGenerator(std::move(sub_readahead)); +} + /// \brief Creates a generator that takes in a stream of generators and pulls from each /// one in sequence. /// diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 22f55d5cb20..0497cc8be40 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -668,6 +668,182 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) { INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture, ::testing::Values(false, true)); +class AutoStartingGeneratorTestFixture : public GeneratorTestFixture { +}; + +TEST_P(AutoStartingGeneratorTestFixture, Basic) { + AsyncGenerator source = MakeSource({1, 2, 3}); + TrackingGenerator tracked(source); + AsyncGenerator gen = MakeAutoStartingGenerator(static_cast>(tracked)); + ASSERT_EQ(1, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), gen()); + ASSERT_EQ(1, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), gen()); + ASSERT_EQ(2, tracked.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(3), gen()); + ASSERT_EQ(3, tracked.num_read()); + AssertGeneratorExhausted(gen); +} + +TEST_P(AutoStartingGeneratorTestFixture, CopySafe) { + AsyncGenerator source = MakeSource({1, 2, 3}); + AsyncGenerator gen = MakeAutoStartingGenerator(std::move(source)); + AsyncGenerator copy = gen; + ASSERT_FINISHES_OK_AND_EQ(TestInt(1), gen()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), copy()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(3), gen()); + AssertGeneratorExhausted(gen); + AssertGeneratorExhausted(copy); +} + +INSTANTIATE_TEST_SUITE_P(AutoStartingGeneratorTests, AutoStartingGeneratorTestFixture, + ::testing::Values(false, true)); + +class SeqMergedGeneratorTestFixture : public ::testing::Test { + + protected: + SeqMergedGeneratorTestFixture() : tracked_source_(push_source_) {} + + void BeginCaptureOutput(AsyncGenerator gen) { + finished_ = VisitAsyncGenerator(std::move(gen), [this] (TestInt val) { + sink_.push_back(val.value); + return Status::OK(); + }); + } + + void EmitItem(int sub_index, int value) { + EXPECT_LT(sub_index, push_subs_.size()); + push_subs_[sub_index].producer().Push(value); + } + + void EmitErrorItem(int sub_index) { + EXPECT_LT(sub_index, push_subs_.size()); + push_subs_[sub_index].producer().Push(Status::Invalid("XYZ")); + } + + void EmitSub() { + PushGenerator sub; + TrackingGenerator tracked_sub(sub); + tracked_subs_.push_back(tracked_sub); + push_subs_.push_back(std::move(sub)); + push_source_.producer().Push(std::move(tracked_sub)); + } + + void EmitErrorSub() { + push_source_.producer().Push(Status::Invalid("XYZ")); + } + + void FinishSub(int sub_index) { + EXPECT_LT(sub_index, tracked_subs_.size()); + push_subs_[sub_index].producer().Close(); + } + + void FinishSubs() { + push_source_.producer().Close(); + } + + void AssertFinishedOk() { + ASSERT_FINISHES_OK(finished_); + } + + void AssertFailed() { + ASSERT_FINISHES_AND_RAISES(Invalid, finished_); + } + + int NumItemsAskedFor(int sub_index) { + EXPECT_LT(sub_index, tracked_subs_.size()); + return tracked_subs_[sub_index].num_read(); + } + + int NumSubsAskedFor() { + return tracked_source_.num_read(); + } + + void AssertRead(std::vector values) { + ASSERT_EQ(values.size(), sink_.size()); + for (std::size_t i = 0; i < sink_.size(); i++) { + ASSERT_EQ(values[i], sink_[i]); + } + } + + PushGenerator> push_source_; + std::vector> push_subs_; + std::vector> tracked_subs_; + TrackingGenerator> tracked_source_; + Future<> finished_; + std::vector sink_; + +}; + +TEST_F(SeqMergedGeneratorTestFixture, Basic) { + ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + // Should not initially ask for anything + ASSERT_EQ(0, NumSubsAskedFor()); + BeginCaptureOutput(gen); + // Should not read ahead async-reentrantly from source + ASSERT_EQ(1, NumSubsAskedFor()); + EmitSub(); + ASSERT_EQ(2, NumSubsAskedFor()); + // Should immediately start polling + ASSERT_EQ(1, NumItemsAskedFor(0)); + EmitSub(); + EmitSub(); + EmitSub(); + EmitSub(); + // Should limit how many subs it reads ahead + ASSERT_EQ(4, NumSubsAskedFor()); + // Should immediately start polling subs even if they aren't yet active + ASSERT_EQ(1, NumItemsAskedFor(1)); + ASSERT_EQ(1, NumItemsAskedFor(2)); + ASSERT_EQ(1, NumItemsAskedFor(3)); + // Items emitted on non-active subs should not be delivered and should not trigger + // further polling on the inactive sub + EmitItem(1, 0); + ASSERT_EQ(1, NumItemsAskedFor(1)); + AssertRead({}); + EmitItem(0, 1); + AssertRead({1}); + ASSERT_EQ(2, NumItemsAskedFor(0)); + EmitItem(0, 2); + AssertRead({1, 2}); + ASSERT_EQ(3, NumItemsAskedFor(0)); + // On finish it should move to the next sub and pull 1 item + FinishSub(0); + ASSERT_EQ(5, NumSubsAskedFor()); + ASSERT_EQ(2, NumItemsAskedFor(1)); + AssertRead({1, 2, 0}); + // Now finish all the subs and make sure an empty sub is ok + FinishSub(1); + FinishSub(2); + FinishSub(3); + FinishSub(4); + ASSERT_EQ(6, NumSubsAskedFor()); + FinishSubs(); + AssertFinishedOk(); +} + +TEST_F(SeqMergedGeneratorTestFixture, ErrorItem) { + ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + BeginCaptureOutput(gen); + EmitSub(); + EmitSub(); + EmitErrorItem(1); + // It will still read from the active sub and won't notice the error until it switches to the failing sub + EmitItem(0, 0); + AssertRead({0}); + FinishSub(0); + AssertFailed(); +} + +TEST_F(SeqMergedGeneratorTestFixture, ErrorSub) { + ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + BeginCaptureOutput(gen); + EmitSub(); + EmitErrorSub(); + FinishSub(0); + AssertFailed(); +} + TEST(TestAsyncUtil, FromVector) { AsyncGenerator gen; { From 6d228f19dfc57e653d1b18071f07cc830f5b2938 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Oct 2021 12:53:12 -1000 Subject: [PATCH 2/6] ARROW-14192: Lint and added a python test --- cpp/src/arrow/dataset/scanner.cc | 30 ++++++++------ cpp/src/arrow/util/async_generator.h | 14 +++---- cpp/src/arrow/util/async_generator_test.cc | 48 +++++++++++----------- python/pyarrow/tests/test_dataset.py | 36 ++++++++++++++++ 4 files changed, 84 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index a07154cc040..23942ec37da 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -441,7 +441,8 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this ScanBatchesAsync(Executor* executor); Future<> VisitBatchesAsync(std::function visitor, Executor* executor); - Result ScanBatchesUnorderedAsync(Executor* executor, bool sequence_fragments = false); + Result ScanBatchesUnorderedAsync( + Executor* executor, bool sequence_fragments = false); Future> ToTableAsync(Executor* executor); Result GetFragments() const; @@ -605,15 +606,17 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( scan_options_->projection.call()->options.get()) ->field_names; - RETURN_NOT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle, sequence_fragments}}, - {"filter", compute::FilterNodeOptions{scan_options_->filter}}, - {"augmented_project", - compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}}, - }) - .AddToPlan(plan.get())); + RETURN_NOT_OK( + compute::Declaration::Sequence( + { + {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle, + sequence_fragments}}, + {"filter", compute::FilterNodeOptions{scan_options_->filter}}, + {"augmented_project", + compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, + {"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}}, + }) + .AddToPlan(plan.get())); RETURN_NOT_OK(plan->StartProducing()); @@ -648,7 +651,8 @@ Result AsyncScanner::ScanBatchesAsync() { Result AsyncScanner::ScanBatchesAsync( Executor* cpu_executor) { - ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor, /*sequence_fragments=*/true)); + ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync( + cpu_executor, /*sequence_fragments=*/true)); // We need an initial value sentinel, so we use one with fragment.index < 0 auto is_before_any = [](const EnumeratedRecordBatch& batch) { return batch.fragment.index < 0; @@ -1177,7 +1181,9 @@ Result MakeScanNode(compute::ExecPlan* plan, AsyncGenerator merged_batch_gen; if (require_sequenced_output) { - ARROW_ASSIGN_OR_RAISE(merged_batch_gen, MakeSequencedMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead)); + ARROW_ASSIGN_OR_RAISE(merged_batch_gen, + MakeSequencedMergedGenerator(std::move(batch_gen_gen), + scan_options->fragment_readahead)); } else { merged_batch_gen = MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead); diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 8c457dd5544..c9fa215eb7b 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -715,7 +714,6 @@ AsyncGenerator MakeSerialReadaheadGenerator(AsyncGenerator source_generato template AsyncGenerator MakeAutoStartingGenerator(AsyncGenerator generator) { struct AutostartGenerator { - Future operator()() { if (first_future->is_valid()) { Future result = *first_future; @@ -1179,17 +1177,19 @@ AsyncGenerator MakeMergedGenerator(AsyncGenerator> source, } template -Result> MakeSequencedMergedGenerator(AsyncGenerator> source, int max_subscriptions) { +Result> MakeSequencedMergedGenerator( + AsyncGenerator> source, int max_subscriptions) { if (max_subscriptions < 0) { return Status::Invalid("max_subscriptions must be a positive integer"); } if (max_subscriptions == 1) { return Status::Invalid("Use MakeConcatenatedGenerator is max_subscriptions is 1"); } - AsyncGenerator> autostarting_source = MakeMappedGenerator(std::move(source), [] (const AsyncGenerator& sub) { - return MakeAutoStartingGenerator(sub); - }); - AsyncGenerator> sub_readahead = MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); + AsyncGenerator> autostarting_source = MakeMappedGenerator( + std::move(source), + [](const AsyncGenerator& sub) { return MakeAutoStartingGenerator(sub); }); + AsyncGenerator> sub_readahead = + MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1); return MakeConcatenatedGenerator(std::move(sub_readahead)); } diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 0497cc8be40..d0ca021b061 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -668,13 +668,13 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) { INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture, ::testing::Values(false, true)); -class AutoStartingGeneratorTestFixture : public GeneratorTestFixture { -}; +class AutoStartingGeneratorTestFixture : public GeneratorTestFixture {}; TEST_P(AutoStartingGeneratorTestFixture, Basic) { AsyncGenerator source = MakeSource({1, 2, 3}); TrackingGenerator tracked(source); - AsyncGenerator gen = MakeAutoStartingGenerator(static_cast>(tracked)); + AsyncGenerator gen = + MakeAutoStartingGenerator(static_cast>(tracked)); ASSERT_EQ(1, tracked.num_read()); ASSERT_FINISHES_OK_AND_EQ(TestInt(1), gen()); ASSERT_EQ(1, tracked.num_read()); @@ -700,12 +700,11 @@ INSTANTIATE_TEST_SUITE_P(AutoStartingGeneratorTests, AutoStartingGeneratorTestFi ::testing::Values(false, true)); class SeqMergedGeneratorTestFixture : public ::testing::Test { - protected: SeqMergedGeneratorTestFixture() : tracked_source_(push_source_) {} void BeginCaptureOutput(AsyncGenerator gen) { - finished_ = VisitAsyncGenerator(std::move(gen), [this] (TestInt val) { + finished_ = VisitAsyncGenerator(std::move(gen), [this](TestInt val) { sink_.push_back(val.value); return Status::OK(); }); @@ -729,35 +728,25 @@ class SeqMergedGeneratorTestFixture : public ::testing::Test { push_source_.producer().Push(std::move(tracked_sub)); } - void EmitErrorSub() { - push_source_.producer().Push(Status::Invalid("XYZ")); - } + void EmitErrorSub() { push_source_.producer().Push(Status::Invalid("XYZ")); } void FinishSub(int sub_index) { EXPECT_LT(sub_index, tracked_subs_.size()); push_subs_[sub_index].producer().Close(); } - void FinishSubs() { - push_source_.producer().Close(); - } + void FinishSubs() { push_source_.producer().Close(); } - void AssertFinishedOk() { - ASSERT_FINISHES_OK(finished_); - } + void AssertFinishedOk() { ASSERT_FINISHES_OK(finished_); } - void AssertFailed() { - ASSERT_FINISHES_AND_RAISES(Invalid, finished_); - } + void AssertFailed() { ASSERT_FINISHES_AND_RAISES(Invalid, finished_); } int NumItemsAskedFor(int sub_index) { EXPECT_LT(sub_index, tracked_subs_.size()); return tracked_subs_[sub_index].num_read(); } - int NumSubsAskedFor() { - return tracked_source_.num_read(); - } + int NumSubsAskedFor() { return tracked_source_.num_read(); } void AssertRead(std::vector values) { ASSERT_EQ(values.size(), sink_.size()); @@ -772,11 +761,13 @@ class SeqMergedGeneratorTestFixture : public ::testing::Test { TrackingGenerator> tracked_source_; Future<> finished_; std::vector sink_; - }; TEST_F(SeqMergedGeneratorTestFixture, Basic) { - ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); // Should not initially ask for anything ASSERT_EQ(0, NumSubsAskedFor()); BeginCaptureOutput(gen); @@ -823,12 +814,16 @@ TEST_F(SeqMergedGeneratorTestFixture, Basic) { } TEST_F(SeqMergedGeneratorTestFixture, ErrorItem) { - ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); BeginCaptureOutput(gen); EmitSub(); EmitSub(); EmitErrorItem(1); - // It will still read from the active sub and won't notice the error until it switches to the failing sub + // It will still read from the active sub and won't notice the error until it switches + // to the failing sub EmitItem(0, 0); AssertRead({0}); FinishSub(0); @@ -836,7 +831,10 @@ TEST_F(SeqMergedGeneratorTestFixture, ErrorItem) { } TEST_F(SeqMergedGeneratorTestFixture, ErrorSub) { - ASSERT_OK_AND_ASSIGN(AsyncGenerator gen, MakeSequencedMergedGenerator(static_cast>>(tracked_source_), 4)); + ASSERT_OK_AND_ASSIGN( + AsyncGenerator gen, + MakeSequencedMergedGenerator( + static_cast>>(tracked_source_), 4)); BeginCaptureOutput(gen); EmitSub(); EmitErrorSub(); diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e5590c4a6bf..54fa9f6f2d0 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -422,6 +422,42 @@ def test_scanner(dataset, dataset_reader): assert table.num_rows == scanner.count_rows() +def test_scanner_backpressure(): + batch = pa.record_batch([pa.array([1, 2, 3])], names=['data']) + num_read = 0 + min_read = 64 # From kDefaultBackpressureHigh in scanner.h + end = 200 + + def counting_generator(): + nonlocal num_read + while num_read < end: + time.sleep(0.01) + num_read += 1 + yield batch + + scanner = ds.Scanner.from_batches( + counting_generator(), batch.schema, use_threads=True, use_async=True) + + _batch_iter = scanner.to_batches() # This line starts the scan + + start = time.time() + + def duration(): + return time.time() - start + + last_num_read = 0 + backpressure_probably_hit = False + while duration() < 10: + if num_read > min_read and num_read == last_num_read: + backpressure_probably_hit = True + break + last_num_read = num_read + time.sleep(0.5) + + assert backpressure_probably_hit + assert len(list(_batch_iter)) == end + + def test_head(dataset, dataset_reader): result = dataset_reader.head(dataset, 0) assert result == pa.Table.from_batches([], schema=dataset.schema) From 262e379a1cd609c0327160c71ad960a240dd98d1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Oct 2021 13:19:47 -1000 Subject: [PATCH 3/6] ARROW-14192: Fixing test issue introduced by rebase --- cpp/src/arrow/util/async_generator_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index d0ca021b061..67c6e201d60 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -672,7 +672,7 @@ class AutoStartingGeneratorTestFixture : public GeneratorTestFixture {}; TEST_P(AutoStartingGeneratorTestFixture, Basic) { AsyncGenerator source = MakeSource({1, 2, 3}); - TrackingGenerator tracked(source); + util::TrackingGenerator tracked(source); AsyncGenerator gen = MakeAutoStartingGenerator(static_cast>(tracked)); ASSERT_EQ(1, tracked.num_read()); @@ -722,7 +722,7 @@ class SeqMergedGeneratorTestFixture : public ::testing::Test { void EmitSub() { PushGenerator sub; - TrackingGenerator tracked_sub(sub); + util::TrackingGenerator tracked_sub(sub); tracked_subs_.push_back(tracked_sub); push_subs_.push_back(std::move(sub)); push_source_.producer().Push(std::move(tracked_sub)); @@ -757,8 +757,8 @@ class SeqMergedGeneratorTestFixture : public ::testing::Test { PushGenerator> push_source_; std::vector> push_subs_; - std::vector> tracked_subs_; - TrackingGenerator> tracked_source_; + std::vector> tracked_subs_; + util::TrackingGenerator> tracked_source_; Future<> finished_; std::vector sink_; }; From d1ac5046872f1be80aff541ce48941fffc5391b1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Oct 2021 13:41:57 -1000 Subject: [PATCH 4/6] ARROW-14192: Added the C++ ordered backpressure scanning test back in --- cpp/src/arrow/dataset/scanner_test.cc | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 40a0e005a3f..83151de2c02 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1027,12 +1027,12 @@ class TestBackpressure : public ::testing::Test { return sum; } - void Finish(AsyncGenerator gen) { + template + void Finish(AsyncGenerator gen) { for (const auto& controlled_fragment : controlled_fragments_) { controlled_fragment->Finish(); } - ASSERT_FINISHES_OK(VisitAsyncGenerator( - gen, [](EnumeratedRecordBatch batch) { return Status::OK(); })); + ASSERT_FINISHES_OK(VisitAsyncGenerator(gen, [](T batch) { return Status::OK(); })); } std::shared_ptr schema_ = schema({field("values", int32())}); @@ -1063,6 +1063,24 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { Finish(std::move(gen)); } +TEST_F(TestBackpressure, ScanBatchesOrdered) { + std::shared_ptr scanner = MakeScanner(); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, + scanner->ScanBatchesAsync()); + // This future never actually finishes because we only emit the first batch so far and + // the scanner delays by one batch. It is enough to start the system pumping though so + // we don't need it to finish. + Future fut = gen(); + + // See note on other test + GetCpuThreadPool()->WaitForIdle(); + // Worst case we read in the entire set of initial batches + ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); + + DeliverAdditionalBatches(); + Finish(std::move(gen)); +} + struct BatchConsumer { explicit BatchConsumer(EnumeratedRecordBatchGenerator generator) : generator(std::move(generator)), next() {} From de0c5ec4a62d9abde9970e4f3224f82656a63766 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 18 Oct 2021 11:40:40 -1000 Subject: [PATCH 5/6] ARROW-14192: Added a comment to MakeAutoStartingGenerator. Cleaned up some grammar in async_generator.h comments. Removed timing-dependent backpressure test to avoid having too many timing dependent tests. --- cpp/src/arrow/util/async_generator.h | 47 +++++++++++++++++----------- python/pyarrow/tests/test_dataset.py | 36 --------------------- 2 files changed, 28 insertions(+), 55 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index c9fa215eb7b..eb852c0a9a8 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -111,14 +111,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator generator, Visitor visitor) { return Loop(LoopBody{std::move(generator), std::move(visitor)}); } -/// \brief Waits for an async generator to complete, discarding results. +/// \brief Wait for an async generator to complete, discarding results. template Future<> DiscardAllFromAsyncGenerator(AsyncGenerator generator) { std::function visitor = [](const T&) { return Status::OK(); }; return VisitAsyncGenerator(generator, visitor); } -/// \brief Collects the results of an async generator into a vector +/// \brief Collect the results of an async generator into a vector template Future> CollectAsyncGenerator(AsyncGenerator generator) { auto vec = std::make_shared>(); @@ -258,7 +258,7 @@ class MappingGenerator { std::shared_ptr state_; }; -/// \brief Creates a generator that will apply the map function to each element of +/// \brief Create a generator that will apply the map function to each element of /// source. The map function is not called on the end token. /// /// Note: This function makes a copy of `map` for each item @@ -278,7 +278,7 @@ AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, MapFn return MappingGenerator(std::move(source_generator), MapCallback{std::move(map)}); } -/// \brief Creates a generator that will apply the map function to +/// \brief Create a generator that will apply the map function to /// each element of source. The map function is not called on the end /// token. The result of the map function should be another /// generator; all these generators will then be flattened to produce @@ -417,7 +417,7 @@ class SequencingGenerator { const std::shared_ptr state_; }; -/// \brief Buffers an AsyncGenerator to return values in sequence order ComesAfter +/// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter /// and IsNext determine the sequence order. /// /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b @@ -532,7 +532,7 @@ class TransformingGenerator { std::shared_ptr state_; }; -/// \brief Transforms an async generator using a transformer function returning a new +/// \brief Transform an async generator using a transformer function returning a new /// AsyncGenerator /// /// The transform function here behaves exactly the same as the transform function in @@ -686,7 +686,7 @@ class FutureFirstGenerator { std::shared_ptr state_; }; -/// \brief Transforms a Future> into an AsyncGenerator +/// \brief Transform a Future> into an AsyncGenerator /// that waits for the future to complete as part of the first item. /// /// This generator is not async-reentrant (even if the generator yielded by future is) @@ -697,7 +697,7 @@ AsyncGenerator MakeFromFuture(Future> future) { return FutureFirstGenerator(std::move(future)); } -/// \brief Creates a generator that will pull from the source into a queue. Unlike +/// \brief Create a generator that will pull from the source into a queue. Unlike /// MakeReadaheadGenerator this will not pull reentrantly from the source. /// /// The source generator does not need to be async-reentrant @@ -711,6 +711,15 @@ AsyncGenerator MakeSerialReadaheadGenerator(AsyncGenerator source_generato return SerialReadaheadGenerator(std::move(source_generator), max_readahead); } +/// \brief Create a generator that immediately pulls from the source +/// +/// Typical generators do not pull from their source until they themselves +/// are pulled. This generator does not follow that convention and will call +/// generator() once before it returns. The returned generator will otherwise +/// mirror the source. +/// +/// This generator forwards aysnc-reentrant pressure to the source +/// This generator buffers one item (the first result) until it is delivered. template AsyncGenerator MakeAutoStartingGenerator(AsyncGenerator generator) { struct AutostartGenerator { @@ -939,7 +948,7 @@ class PushGenerator { const std::shared_ptr state_; }; -/// \brief Creates a generator that pulls reentrantly from a source +/// \brief Create a generator that pulls reentrantly from a source /// This generator will pull reentrantly from a source, ensuring that max_readahead /// requests are active at any given time. /// @@ -1157,7 +1166,7 @@ class MergedGenerator { std::shared_ptr state_; }; -/// \brief Creates a generator that takes in a stream of generators and pulls from up to +/// \brief Create a generator that takes in a stream of generators and pulls from up to /// max_subscriptions at a time /// /// Note: This may deliver items out of sequence. For example, items from the third @@ -1193,7 +1202,7 @@ Result> MakeSequencedMergedGenerator( return MakeConcatenatedGenerator(std::move(sub_readahead)); } -/// \brief Creates a generator that takes in a stream of generators and pulls from each +/// \brief Create a generator that takes in a stream of generators and pulls from each /// one in sequence. /// /// This generator is async-reentrant but will never pull from source reentrantly and @@ -1261,7 +1270,7 @@ class EnumeratingGenerator { std::shared_ptr state_; }; -/// Wraps items from a source generator with positional information +/// Wrap items from a source generator with positional information /// /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be /// processed in a "first-available" fashion and later resequenced which can reduce the @@ -1297,7 +1306,7 @@ class TransferringGenerator { internal::Executor* executor_; }; -/// \brief Transfers a future to an underlying executor. +/// \brief Transfer a future to an underlying executor. /// /// Continuations run on the returned future will be run on the given executor /// if they cannot be run synchronously. @@ -1543,7 +1552,7 @@ class BackgroundGenerator { constexpr int kDefaultBackgroundMaxQ = 32; constexpr int kDefaultBackgroundQRestart = 16; -/// \brief Creates an AsyncGenerator by iterating over an Iterator on a background +/// \brief Create an AsyncGenerator by iterating over an Iterator on a background /// thread /// /// The parameter max_q and q_restart control queue size and background thread task @@ -1591,14 +1600,14 @@ class GeneratorIterator { AsyncGenerator source_; }; -/// \brief Converts an AsyncGenerator to an Iterator by blocking until each future +/// \brief Convert an AsyncGenerator to an Iterator which blocks until each future /// is finished template Iterator MakeGeneratorIterator(AsyncGenerator source) { return Iterator(GeneratorIterator(std::move(source))); } -/// \brief Adds readahead to an iterator using a background thread. +/// \brief Add readahead to an iterator using a background thread. /// /// Under the hood this is converting the iterator to a generator using /// MakeBackgroundGenerator, adding readahead to the converted generator with @@ -1670,7 +1679,7 @@ AsyncGenerator MakeFailingGenerator(const Result& result) { return MakeFailingGenerator(result.status()); } -/// \brief Prepends initial_values onto a generator +/// \brief Prepend initial_values onto a generator /// /// This generator is async-reentrant but will buffer requests and will not /// pull from following_values async-reentrantly. @@ -1696,7 +1705,7 @@ struct CancellableGenerator { StopToken stop_token; }; -/// \brief Allows an async generator to be cancelled +/// \brief Allow an async generator to be cancelled /// /// This generator is async-reentrant template @@ -1735,7 +1744,7 @@ struct PauseableGenerator { std::shared_ptr state_; }; -/// \brief Allows an async generator to be paused +/// \brief Allow an async generator to be paused /// /// This generator is NOT async-reentrant and calling it in an async-reentrant fashion /// may lead to items getting reordered (and potentially truncated if the end token is diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 54fa9f6f2d0..e5590c4a6bf 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -422,42 +422,6 @@ def test_scanner(dataset, dataset_reader): assert table.num_rows == scanner.count_rows() -def test_scanner_backpressure(): - batch = pa.record_batch([pa.array([1, 2, 3])], names=['data']) - num_read = 0 - min_read = 64 # From kDefaultBackpressureHigh in scanner.h - end = 200 - - def counting_generator(): - nonlocal num_read - while num_read < end: - time.sleep(0.01) - num_read += 1 - yield batch - - scanner = ds.Scanner.from_batches( - counting_generator(), batch.schema, use_threads=True, use_async=True) - - _batch_iter = scanner.to_batches() # This line starts the scan - - start = time.time() - - def duration(): - return time.time() - start - - last_num_read = 0 - backpressure_probably_hit = False - while duration() < 10: - if num_read > min_read and num_read == last_num_read: - backpressure_probably_hit = True - break - last_num_read = num_read - time.sleep(0.5) - - assert backpressure_probably_hit - assert len(list(_batch_iter)) == end - - def test_head(dataset, dataset_reader): result = dataset_reader.head(dataset, 0) assert result == pa.Table.from_batches([], schema=dataset.schema) From 189516f111d534a80afb92da76ab8d7c155dcf15 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 18 Oct 2021 11:43:28 -1000 Subject: [PATCH 6/6] ARROW-14192: Update cpp/src/arrow/util/async_generator.h Co-authored-by: David Li --- cpp/src/arrow/util/async_generator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index eb852c0a9a8..0948e5537fe 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1192,7 +1192,7 @@ Result> MakeSequencedMergedGenerator( return Status::Invalid("max_subscriptions must be a positive integer"); } if (max_subscriptions == 1) { - return Status::Invalid("Use MakeConcatenatedGenerator is max_subscriptions is 1"); + return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1"); } AsyncGenerator> autostarting_source = MakeMappedGenerator( std::move(source),