From 0e9e018034c72acf48ba8499c32ce2c0cf82359c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 1 Oct 2021 01:10:39 -1000 Subject: [PATCH 01/13] ARROW-13611: Added backpressure into the asynchronous scanner. Currently it only works for unordered scans. This commit adds backpressure in a way that bypasses the exec plan entirely and these utilities will eventually change as the exec plan evolves. --- cpp/src/arrow/compute/exec/options.h | 6 +- cpp/src/arrow/compute/exec/sink_node.cc | 12 +- cpp/src/arrow/dataset/scanner.cc | 26 ++-- cpp/src/arrow/dataset/scanner.h | 7 +- cpp/src/arrow/dataset/scanner_test.cc | 102 +++++++++++++++- cpp/src/arrow/testing/async_test_util.h | 54 ++++++++ cpp/src/arrow/util/async_generator.h | 65 +++++++++- cpp/src/arrow/util/async_generator_test.cc | 136 +++++++++++++++++---- cpp/src/arrow/util/async_util.cc | 36 ++++++ cpp/src/arrow/util/async_util.h | 50 ++++++++ 10 files changed, 446 insertions(+), 48 deletions(-) create mode 100644 cpp/src/arrow/testing/async_test_util.h diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 1fc6db642e0..90b27caa3ec 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -26,6 +26,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/exec/expression.h" +#include "arrow/util/async_util.h" #include "arrow/util/optional.h" #include "arrow/util/visibility.h" @@ -110,10 +111,11 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { /// Emitted batches will not be ordered. class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { public: - explicit SinkNodeOptions(std::function>()>* generator) - : generator(generator) {} + explicit SinkNodeOptions(std::function>()>* generator, util::BackpressureOptions backpressure = {}) + : generator(generator), backpressure(std::move(backpressure)) {} std::function>()>* generator; + util::BackpressureOptions backpressure; }; class ARROW_EXPORT SinkNodeConsumer { diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index e1c10edd8a9..51262ccb933 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -49,22 +49,22 @@ namespace { class SinkNode : public ExecNode { public: SinkNode(ExecPlan* plan, std::vector inputs, - AsyncGenerator>* generator) + AsyncGenerator>* generator, util::BackpressureOptions backpressure) : ExecNode(plan, std::move(inputs), {"collected"}, {}, /*num_outputs=*/0), - producer_(MakeProducer(generator)) {} + producer_(MakeProducer(generator, std::move(backpressure))) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode")); const auto& sink_options = checked_cast(options); - return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator); + return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator, sink_options.backpressure); } static PushGenerator>::Producer MakeProducer( - AsyncGenerator>* out_gen) { - PushGenerator> push_gen; + AsyncGenerator>* out_gen, util::BackpressureOptions backpressure) { + PushGenerator> push_gen(std::move(backpressure)); auto out = push_gen.producer(); *out_gen = std::move(push_gen); return out; @@ -235,7 +235,7 @@ struct OrderBySinkNode final : public SinkNode { OrderBySinkNode(ExecPlan* plan, std::vector inputs, std::unique_ptr impl, AsyncGenerator>* generator) - : SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {} + : SinkNode(plan, std::move(inputs), generator, {}), impl_{std::move(impl)} {} const char* kind_name() const override { return "OrderBySinkNode"; } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 433e93172c9..6650e46041c 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -598,20 +598,23 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get())); AsyncGenerator> sink_gen; + util::BackpressureOptions backpressure = + util::MakeBackpressureOptions(kDefaultBackpressureLow, kDefaultBackpressureHigh); auto exprs = scan_options_->projection.call()->arguments; auto names = checked_cast( scan_options_->projection.call()->options.get()) ->field_names; - RETURN_NOT_OK(compute::Declaration::Sequence( - { - {"scan", ScanNodeOptions{dataset_, scan_options_}}, - {"filter", compute::FilterNodeOptions{scan_options_->filter}}, - {"augmented_project", - compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"sink", compute::SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + RETURN_NOT_OK( + compute::Declaration::Sequence( + { + {"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}}, + {"filter", compute::FilterNodeOptions{scan_options_->filter}}, + {"augmented_project", + compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}, + {"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}}, + }) + .AddToPlan(plan.get())); RETURN_NOT_OK(plan->StartProducing()); @@ -1139,6 +1142,7 @@ Result MakeScanNode(compute::ExecPlan* plan, const auto& scan_node_options = checked_cast(options); auto scan_options = scan_node_options.scan_options; auto dataset = scan_node_options.dataset; + const auto& backpressure_toggle = scan_node_options.backpressure_toggle; if (!scan_options->use_async) { return Status::NotImplemented("ScanNodes without asynchrony"); @@ -1201,6 +1205,10 @@ Result MakeScanNode(compute::ExecPlan* plan, return batch; }); + if (backpressure_toggle) { + gen = MakePauseable(gen, backpressure_toggle); + } + auto fields = scan_options->dataset_schema->fields(); for (const auto& aug_field : kAugmentedFields) { fields.push_back(aug_field); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 9264e9f548a..6eaeb59d978 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -53,6 +53,8 @@ namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; constexpr int32_t kDefaultBatchReadahead = 32; constexpr int32_t kDefaultFragmentReadahead = 8; +constexpr int32_t kDefaultBackpressureHigh = 64; +constexpr int32_t kDefaultBackpressureLow = 32; /// Scan-specific options, which can be changed between scans of the same dataset. struct ARROW_DS_EXPORT ScanOptions { @@ -418,11 +420,12 @@ class ARROW_DS_EXPORT ScannerBuilder { class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { public: explicit ScanNodeOptions(std::shared_ptr dataset, - std::shared_ptr scan_options) - : dataset(std::move(dataset)), scan_options(std::move(scan_options)) {} + std::shared_ptr scan_options, std::shared_ptr backpressure_toggle = NULLPTR) + : dataset(std::move(dataset)), scan_options(std::move(scan_options)), backpressure_toggle(std::move(backpressure_toggle)) {} std::shared_ptr dataset; std::shared_ptr scan_options; + std::shared_ptr backpressure_toggle; }; /// @} diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 6235cf2fd50..2996bc5744c 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -32,6 +32,7 @@ #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/testing/async_test_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" @@ -740,7 +741,8 @@ INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner, class ControlledFragment : public Fragment { public: explicit ControlledFragment(std::shared_ptr schema) - : Fragment(literal(true), std::move(schema)) {} + : Fragment(literal(true), std::move(schema)), record_batch_generator_(), tracking_generator_(record_batch_generator_) { + } Result Scan(std::shared_ptr options) override { return Status::NotImplemented( @@ -753,9 +755,11 @@ class ControlledFragment : public Fragment { Result ScanBatchesAsync( const std::shared_ptr& options) override { - return record_batch_generator_; + return tracking_generator_; }; + int NumBatchesRead() { return tracking_generator_.num_read(); } + void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); } void DeliverBatch(uint32_t num_rows) { auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_); @@ -764,6 +768,7 @@ class ControlledFragment : public Fragment { private: PushGenerator> record_batch_generator_; + util::TrackingGenerator> tracking_generator_; }; // TODO(ARROW-8163) Add testing for fragments arriving out of order @@ -963,6 +968,99 @@ TEST_F(TestReordering, ScanBatchesUnordered) { AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4}); } +class TestBackpressure : public ::testing::Test { + + protected: + + static constexpr int NFRAGMENTS = 10; + static constexpr int NBATCHES = 1000; + static constexpr int NROWS = 10; + + + FragmentVector MakeFragments() { + FragmentVector fragments; + for (int i = 0; i < NFRAGMENTS; i++) { + controlled_fragments_.emplace_back(std::make_shared(schema_)); + fragments.push_back(controlled_fragments_[i]); + // We only emit one batch on the first fragment. This triggers the sequencing + // generator to dig really deep to try and find the second batch + int num_to_emit = NBATCHES; + if (i == 0) { + num_to_emit = 1; + } + for (int j = 0; j < num_to_emit; j++) { + controlled_fragments_[i]->DeliverBatch(NROWS); + } + } + return fragments; + } + + std::shared_ptr MakeDataset() { + FragmentVector fragments = MakeFragments(); + return std::make_shared(schema_, std::move(fragments)); + } + + std::shared_ptr MakeScanner() { + std::shared_ptr dataset = MakeDataset(); + std::shared_ptr options = std::make_shared(); + ScannerBuilder builder(std::move(dataset), options); + ARROW_EXPECT_OK(builder.UseThreads(true)); + ARROW_EXPECT_OK(builder.UseAsync(true)); + ARROW_EXPECT_OK(builder.FragmentReadahead(4)); + EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + return scanner; + } + + int TotalBatchesRead() { + int sum = 0; + for (const auto& controlled_fragment : controlled_fragments_) { + sum += controlled_fragment->NumBatchesRead(); + } + return sum; + } + + std::shared_ptr schema_ = schema({field("values", int32())}); + std::vector> controlled_fragments_; + +}; + +TEST_F(TestBackpressure, ScanBatchesUnordered) { + std::shared_ptr scanner = MakeScanner(); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesUnorderedAsync()); + ASSERT_FINISHES_OK(gen()); + + // The exact numbers may be imprecise due to threading but we should pretty quickly read + // up to our backpressure limit and a little above. We should not be able to go too far + // above. + BusyWait(10, [&] { + return TotalBatchesRead() >= kDefaultBackpressureHigh; + }); + SleepABit(); + ASSERT_LT(TotalBatchesRead(), 2 * kDefaultBackpressureHigh); +} + +TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { + std::shared_ptr scanner = MakeScanner(); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesAsync()); + // This future never actually finishes because we only emit the first batch so far and + // the scanner delays by one batch. It is enough to start the system pumping though so + // we don't need it to finish. + Future fut =gen(); + + // The exact numbers may be imprecise due to threading but we should pretty quickly read + // up to our backpressure limit and a little above. We should not be able to go too far + // above. + BusyWait(10, [&] { + return TotalBatchesRead() >= kDefaultBackpressureHigh; + }); + // This can yield some false passes but it is tricky to test that a counter doesn't + // increase over time. + for (int i = 0; i < 20; i++) { + SleepABit(); + } + ASSERT_LT(TotalBatchesRead(), 2 * kDefaultBackpressureHigh); +} + struct BatchConsumer { explicit BatchConsumer(EnumeratedRecordBatchGenerator generator) : generator(std::move(generator)), next() {} diff --git a/cpp/src/arrow/testing/async_test_util.h b/cpp/src/arrow/testing/async_test_util.h new file mode 100644 index 00000000000..9fbe33dd91e --- /dev/null +++ b/cpp/src/arrow/testing/async_test_util.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/util/future.h" +#include "arrow/util/async_generator.h" + +namespace arrow { +namespace util { + +template +class TrackingGenerator { + public: + explicit TrackingGenerator(AsyncGenerator source) + : state_(std::make_shared(std::move(source))) {} + + Future operator()() { + state_->num_read++; + return state_->source(); + } + + int num_read() { return state_->num_read.load(); } + + private: + struct State { + explicit State(AsyncGenerator source) : source(std::move(source)), num_read(0) {} + + AsyncGenerator source; + std::atomic num_read; + }; + + std::shared_ptr state_; +}; + +} +} \ No newline at end of file diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index e751c7511d3..65749a0d2cb 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -24,6 +24,7 @@ #include #include +#include "arrow/util/async_util.h" #include "arrow/util/functional.h" #include "arrow/util/future.h" #include "arrow/util/io_util.h" @@ -785,6 +786,24 @@ class ReadaheadGenerator { template class PushGenerator { struct State { + State(util::BackpressureOptions backpressure) + : backpressure(std::move(backpressure)) {} + + void OpenBackpressureIfFree() { + if (backpressure.toggle && !backpressure.toggle->IsOpen() && + result_q.size() < backpressure.resume_if_below) { + backpressure.toggle->Open(); + } + } + + void CloseBackpressureIfFull() { + if (backpressure.toggle && backpressure.toggle->IsOpen() && + result_q.size() > backpressure.pause_if_above) { + backpressure.toggle->Close(); + } + } + + util::BackpressureOptions backpressure; util::Mutex mutex; std::deque> result_q; util::optional> consumer_fut; @@ -820,6 +839,7 @@ class PushGenerator { fut.MarkFinished(std::move(result)); } else { state->result_q.push_back(std::move(result)); + state->CloseBackpressureIfFull(); } return true; } @@ -868,7 +888,8 @@ class PushGenerator { const std::weak_ptr weak_state_; }; - PushGenerator() : state_(std::make_shared()) {} + PushGenerator(util::BackpressureOptions backpressure = {}) + : state_(std::make_shared(std::move(backpressure))) {} /// Read an item from the queue Future operator()() const { @@ -877,6 +898,7 @@ class PushGenerator { if (!state_->result_q.empty()) { auto fut = Future::MakeFinished(std::move(state_->result_q.front())); state_->result_q.pop_front(); + state_->OpenBackpressureIfFree(); return fut; } if (state_->finished) { @@ -1645,6 +1667,47 @@ AsyncGenerator MakeCancellable(AsyncGenerator source, StopToken stop_token return CancellableGenerator{std::move(source), std::move(stop_token)}; } +template +struct PauseableGenerator { + public: + PauseableGenerator(AsyncGenerator source, std::shared_ptr toggle) + : state_(std::make_shared(std::move(source), + std::move(toggle))) {} + + Future operator()() { return (*state_)(); } + + private: + struct PauseableGeneratorState + : public std::enable_shared_from_this { + PauseableGeneratorState(AsyncGenerator source, + std::shared_ptr toggle) + : source_(std::move(source)), toggle_(std::move(toggle)) {} + + Future operator()() { + std::shared_ptr self = this->shared_from_this(); + return toggle_->WhenOpen().Then([self] { + util::Mutex::Guard guard = self->mutex_.Lock(); + return self->source_(); + }); + } + + AsyncGenerator source_; + std::shared_ptr toggle_; + util::Mutex mutex_; + }; + std::shared_ptr state_; +}; + +/// \brief Allows an async generator to be paused +/// +/// This generator is async-reentrant if the source is +/// This generator forwards async-reentrant pressure +template +AsyncGenerator MakePauseable(AsyncGenerator source, + std::shared_ptr toggle) { + return PauseableGenerator(std::move(source), std::move(toggle)); +} + template class DefaultIfEmptyGenerator { public: diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index cb269ccb684..0dca4d13dc5 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -27,8 +27,10 @@ #include "arrow/io/slow.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/async_test_util.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" +#include "arrow/util/async_util.h" #include "arrow/util/optional.h" #include "arrow/util/test_common.h" #include "arrow/util/vector.h" @@ -72,30 +74,6 @@ AsyncGenerator MakeJittery(AsyncGenerator source) { }); } -template -class TrackingGenerator { - public: - explicit TrackingGenerator(AsyncGenerator source) - : state_(std::make_shared(std::move(source))) {} - - Future operator()() { - state_->num_read++; - return state_->source(); - } - - int num_read() { return state_->num_read.load(); } - - private: - struct State { - explicit State(AsyncGenerator source) : source(std::move(source)), num_read(0) {} - - AsyncGenerator source; - std::atomic num_read; - }; - - std::shared_ptr state_; -}; - // Yields items with a small pause between each one from a background thread std::function()> BackgroundAsyncVectorIt( std::vector v, bool sleep = true, int max_q = kDefaultBackgroundMaxQ, @@ -386,7 +364,7 @@ TEST(TestAsyncUtil, MapAsync) { TEST(TestAsyncUtil, MapReentrant) { std::vector input = {1, 2}; auto source = AsyncVectorIt(input); - TrackingGenerator tracker(std::move(source)); + util::TrackingGenerator tracker(std::move(source)); source = MakeTransferredGenerator(AsyncGenerator(tracker), internal::GetCpuThreadPool()); @@ -590,7 +568,7 @@ TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) { auto gen = AsyncVectorIt>( {MakeSource({1, 2}), MakeSource({3, 4}), MakeSource({5, 6, 7, 8}), MakeSource({9, 10, 11, 12})}); - TrackingGenerator> tracker(std::move(gen)); + util::TrackingGenerator> tracker(std::move(gen)); auto merged = MakeMergedGenerator(AsyncGenerator>(tracker), 2); SleepABit(); @@ -1263,6 +1241,98 @@ TEST_P(EnumeratorTestFixture, Error) { INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture, ::testing::Values(false, true)); +class PauseableTestFixture : public GeneratorTestFixture { + + protected: + PauseableTestFixture() : toggle_(std::make_shared()) { + sink_.clear(); + counter_ = 0; + AsyncGenerator source = GetSource(); + AsyncGenerator pauseable = MakePauseable(std::move(source), toggle_); + finished_ = VisitAsyncGenerator(std::move(pauseable), [this] (TestInt val) { + std::lock_guard lg(mutex_); + sink_.push_back(val.value); + return Status::OK(); + }); + } + + void Emit() { + generator_.producer().Push(counter_++); + } + + void Pause() { + toggle_->Close(); + } + + void Resume() { + toggle_->Open(); + } + + int NumCollected() { + std::lock_guard lg(mutex_); + // The push generator can desequence things so we check and don't count gaps. It's + // a bit inefficient but good enough for this test + int count = 0; + for (std::size_t i = 0; i < sink_.size(); i++) { + int prev_count = count; + for (std::size_t j = 0; j < sink_.size(); j++) { + if (sink_[j] == count) { + count++; + break; + } + } + if (prev_count == count) { + break; + } + } + return count; + } + + void AssertAtLeastNCollected(int target_count) { + BusyWait(10, [this, target_count] { + return NumCollected() >= target_count; + }); + ASSERT_GE(NumCollected(), target_count); + } + + void AssertNoMoreThanNCollected(int target_count) { + ASSERT_LE(NumCollected(), target_count); + } + + AsyncGenerator GetSource() { + const auto& source = static_cast>(generator_); + if (IsSlow()) { + return SlowdownABit(source); + } else { + return source; + } + } + + std::mutex mutex_; + int counter_ = 0; + PushGenerator generator_; + std::shared_ptr toggle_; + std::vector sink_; + Future<> finished_; + +}; + +INSTANTIATE_TEST_SUITE_P(PauseableTests, PauseableTestFixture, + ::testing::Values(false, true)); + +TEST_P(PauseableTestFixture, PauseBasic) { + Emit(); + Pause(); + // This emit was asked for before the pause so it will go through + Emit(); + AssertNoMoreThanNCollected(2); + // This emit should be blocked by the pause + Emit(); + AssertNoMoreThanNCollected(2); + Resume(); + AssertAtLeastNCollected(3); +} + class SequencerTestFixture : public GeneratorTestFixture { protected: void RandomShuffle(std::vector& values) { @@ -1361,6 +1431,20 @@ TEST_P(SequencerTestFixture, SequenceError) { } } +TEST_P(SequencerTestFixture, Readahead) { + AsyncGenerator original = MakeSource({4, 2, 0, 6}); + util::TrackingGenerator tracker(original); + AsyncGenerator sequenced = MakeSequencingGenerator(static_cast>(tracker), cmp_, is_next_, TestInt(-2)); + ASSERT_FINISHES_OK_AND_EQ(TestInt(0), sequenced()); + ASSERT_EQ(3, tracker.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(2), sequenced()); + ASSERT_EQ(3, tracker.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(4), sequenced()); + ASSERT_EQ(3, tracker.num_read()); + ASSERT_FINISHES_OK_AND_EQ(TestInt(6), sequenced()); + ASSERT_EQ(4, tracker.num_read()); +} + TEST_P(SequencerTestFixture, SequenceStress) { constexpr int NITEMS = 100; for (auto task_index = 0; task_index < GetNumItersForStress(); task_index++) { diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 9407684bdda..cbdfd38d509 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -162,5 +162,41 @@ bool SerializedAsyncTaskGroup::TryDrainUnlocked() { return false; } +Future<> AsyncToggle::WhenOpen() { + util::Mutex::Guard guard = mutex_.Lock(); + return when_open_; +} + +void AsyncToggle::Open() { + util::Mutex::Guard guard = mutex_.Lock(); + if (when_open_.is_finished()) { + return; + } + // This swap is needed to ensure we mark finished outside the lock. It does mean that + // later calls to Open might finish before earlier calls to Open but that should be ok. + Future<> to_finish = std::move(when_open_); + when_open_ = Future<>::MakeFinished(); + guard.Unlock(); + to_finish.MarkFinished(); +} + +void AsyncToggle::Close() { + util::Mutex::Guard guard = mutex_.Lock(); + if (!when_open_.is_finished()) { + return; + } + when_open_ = Future<>::Make(); +} + +bool AsyncToggle::IsOpen() { + util::Mutex::Guard guard = mutex_.Lock(); + return when_open_.is_finished(); +} + +BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above) { + auto toggle = std::make_shared(); + return BackpressureOptions{std::move(toggle), resume_if_below, pause_if_above}; +} + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index daa6bad8cee..9b9585d2cb8 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -195,5 +195,55 @@ class ARROW_EXPORT SerializedAsyncTaskGroup { Future<> processing_; }; +class ARROW_EXPORT AsyncToggle { + public: + /// Get a future that will complete when the toggle next becomes open + /// + /// If the toggle is open this returns immediately + /// If the toggle is closed this future will be unfinished until the next call to Open + Future<> WhenOpen(); + /// \brief Close the toggle + /// + /// After this call any call to WhenOpen will be delayed until the next open + void Close(); + /// \brief Open the toggle + /// + /// All current waiters will be released to enter, even if another close call + /// quickly follows + void Open(); + + /// \brief Return true if the toggle is currently open + bool IsOpen(); + + private: + Future<> when_open_ = Future<>::MakeFinished(); + util::Mutex mutex_; +}; + +/// \brief Options to control backpressure behavior +struct BackpressureOptions { + /// \brief Create default options that perform no backpressure + BackpressureOptions() : toggle(NULLPTR), resume_if_below(0), pause_if_above(0){}; + /// \brief Create options that will perform backpressure + /// + /// \param toggle A toggle to be shared between the producer and consumer + /// \param resume_if_below The producer should resume producing if the backpressure + /// queue has fewer than resume_if_below items. + /// \param pause_if_above The producer should pause producing if the backpressure + /// queue has more than pause_if_above items + BackpressureOptions(std::shared_ptr toggle, uint32_t resume_if_below, + uint32_t pause_if_above) + : toggle(std::move(toggle)), + resume_if_below(resume_if_below), + pause_if_above(pause_if_above) {} + + std::shared_ptr toggle; + uint32_t resume_if_below; + uint32_t pause_if_above; +}; + +BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below = 32, + uint32_t pause_if_above = 64); + } // namespace util } // namespace arrow From 1b596cc7041d91d52d3fe43037f6485db250582a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 1 Oct 2021 01:17:19 -1000 Subject: [PATCH 02/13] ARROW-13611: Lint --- cpp/src/arrow/compute/exec/options.h | 3 ++- cpp/src/arrow/compute/exec/sink_node.cc | 9 +++++--- cpp/src/arrow/dataset/scanner.h | 9 +++++--- cpp/src/arrow/dataset/scanner_test.cc | 25 ++++++++------------ cpp/src/arrow/testing/async_test_util.h | 6 ++--- cpp/src/arrow/util/async_generator.h | 4 ++-- cpp/src/arrow/util/async_generator_test.cc | 27 ++++++++-------------- cpp/src/arrow/util/async_util.cc | 3 ++- cpp/src/arrow/util/async_util.h | 2 +- 9 files changed, 41 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 90b27caa3ec..87349191e90 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -111,7 +111,8 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { /// Emitted batches will not be ordered. class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions { public: - explicit SinkNodeOptions(std::function>()>* generator, util::BackpressureOptions backpressure = {}) + explicit SinkNodeOptions(std::function>()>* generator, + util::BackpressureOptions backpressure = {}) : generator(generator), backpressure(std::move(backpressure)) {} std::function>()>* generator; diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 51262ccb933..b1238a14bbb 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -49,7 +49,8 @@ namespace { class SinkNode : public ExecNode { public: SinkNode(ExecPlan* plan, std::vector inputs, - AsyncGenerator>* generator, util::BackpressureOptions backpressure) + AsyncGenerator>* generator, + util::BackpressureOptions backpressure) : ExecNode(plan, std::move(inputs), {"collected"}, {}, /*num_outputs=*/0), producer_(MakeProducer(generator, std::move(backpressure))) {} @@ -59,11 +60,13 @@ class SinkNode : public ExecNode { RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode")); const auto& sink_options = checked_cast(options); - return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator, sink_options.backpressure); + return plan->EmplaceNode(plan, std::move(inputs), sink_options.generator, + sink_options.backpressure); } static PushGenerator>::Producer MakeProducer( - AsyncGenerator>* out_gen, util::BackpressureOptions backpressure) { + AsyncGenerator>* out_gen, + util::BackpressureOptions backpressure) { PushGenerator> push_gen(std::move(backpressure)); auto out = push_gen.producer(); *out_gen = std::move(push_gen); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 6eaeb59d978..78746068d87 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -419,9 +419,12 @@ class ARROW_DS_EXPORT ScannerBuilder { /// ordering for simple ExecPlans. class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { public: - explicit ScanNodeOptions(std::shared_ptr dataset, - std::shared_ptr scan_options, std::shared_ptr backpressure_toggle = NULLPTR) - : dataset(std::move(dataset)), scan_options(std::move(scan_options)), backpressure_toggle(std::move(backpressure_toggle)) {} + explicit ScanNodeOptions( + std::shared_ptr dataset, std::shared_ptr scan_options, + std::shared_ptr backpressure_toggle = NULLPTR) + : dataset(std::move(dataset)), + scan_options(std::move(scan_options)), + backpressure_toggle(std::move(backpressure_toggle)) {} std::shared_ptr dataset; std::shared_ptr scan_options; diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 2996bc5744c..4120014ea4a 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -741,8 +741,9 @@ INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner, class ControlledFragment : public Fragment { public: explicit ControlledFragment(std::shared_ptr schema) - : Fragment(literal(true), std::move(schema)), record_batch_generator_(), tracking_generator_(record_batch_generator_) { - } + : Fragment(literal(true), std::move(schema)), + record_batch_generator_(), + tracking_generator_(record_batch_generator_) {} Result Scan(std::shared_ptr options) override { return Status::NotImplemented( @@ -969,14 +970,11 @@ TEST_F(TestReordering, ScanBatchesUnordered) { } class TestBackpressure : public ::testing::Test { - protected: - static constexpr int NFRAGMENTS = 10; static constexpr int NBATCHES = 1000; static constexpr int NROWS = 10; - FragmentVector MakeFragments() { FragmentVector fragments; for (int i = 0; i < NFRAGMENTS; i++) { @@ -1021,38 +1019,35 @@ class TestBackpressure : public ::testing::Test { std::shared_ptr schema_ = schema({field("values", int32())}); std::vector> controlled_fragments_; - }; TEST_F(TestBackpressure, ScanBatchesUnordered) { std::shared_ptr scanner = MakeScanner(); - EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesUnorderedAsync()); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, + scanner->ScanBatchesUnorderedAsync()); ASSERT_FINISHES_OK(gen()); // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - BusyWait(10, [&] { - return TotalBatchesRead() >= kDefaultBackpressureHigh; - }); + BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; }); SleepABit(); ASSERT_LT(TotalBatchesRead(), 2 * kDefaultBackpressureHigh); } TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { std::shared_ptr scanner = MakeScanner(); - EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesAsync()); + EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, + scanner->ScanBatchesAsync()); // This future never actually finishes because we only emit the first batch so far and // the scanner delays by one batch. It is enough to start the system pumping though so // we don't need it to finish. - Future fut =gen(); + Future fut = gen(); // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - BusyWait(10, [&] { - return TotalBatchesRead() >= kDefaultBackpressureHigh; - }); + BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; }); // This can yield some false passes but it is tricky to test that a counter doesn't // increase over time. for (int i = 0; i < 20; i++) { diff --git a/cpp/src/arrow/testing/async_test_util.h b/cpp/src/arrow/testing/async_test_util.h index 9fbe33dd91e..b9f5487ed0d 100644 --- a/cpp/src/arrow/testing/async_test_util.h +++ b/cpp/src/arrow/testing/async_test_util.h @@ -20,8 +20,8 @@ #include #include -#include "arrow/util/future.h" #include "arrow/util/async_generator.h" +#include "arrow/util/future.h" namespace arrow { namespace util { @@ -50,5 +50,5 @@ class TrackingGenerator { std::shared_ptr state_; }; -} -} \ No newline at end of file +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 65749a0d2cb..03fd7950ecd 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -786,7 +786,7 @@ class ReadaheadGenerator { template class PushGenerator { struct State { - State(util::BackpressureOptions backpressure) + explicit State(util::BackpressureOptions backpressure) : backpressure(std::move(backpressure)) {} void OpenBackpressureIfFree() { @@ -888,7 +888,7 @@ class PushGenerator { const std::weak_ptr weak_state_; }; - PushGenerator(util::BackpressureOptions backpressure = {}) + explicit PushGenerator(util::BackpressureOptions backpressure = {}) : state_(std::make_shared(std::move(backpressure))) {} /// Read an item from the queue diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 0dca4d13dc5..297cc59233e 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -25,9 +25,9 @@ #include #include "arrow/io/slow.h" +#include "arrow/testing/async_test_util.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" -#include "arrow/testing/async_test_util.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" @@ -1242,31 +1242,24 @@ INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture, ::testing::Values(false, true)); class PauseableTestFixture : public GeneratorTestFixture { - protected: PauseableTestFixture() : toggle_(std::make_shared()) { sink_.clear(); counter_ = 0; AsyncGenerator source = GetSource(); AsyncGenerator pauseable = MakePauseable(std::move(source), toggle_); - finished_ = VisitAsyncGenerator(std::move(pauseable), [this] (TestInt val) { + finished_ = VisitAsyncGenerator(std::move(pauseable), [this](TestInt val) { std::lock_guard lg(mutex_); sink_.push_back(val.value); return Status::OK(); }); } - void Emit() { - generator_.producer().Push(counter_++); - } + void Emit() { generator_.producer().Push(counter_++); } - void Pause() { - toggle_->Close(); - } + void Pause() { toggle_->Close(); } - void Resume() { - toggle_->Open(); - } + void Resume() { toggle_->Open(); } int NumCollected() { std::lock_guard lg(mutex_); @@ -1289,10 +1282,8 @@ class PauseableTestFixture : public GeneratorTestFixture { } void AssertAtLeastNCollected(int target_count) { - BusyWait(10, [this, target_count] { - return NumCollected() >= target_count; - }); - ASSERT_GE(NumCollected(), target_count); + BusyWait(10, [this, target_count] { return NumCollected() >= target_count; }); + ASSERT_GE(NumCollected(), target_count); } void AssertNoMoreThanNCollected(int target_count) { @@ -1314,7 +1305,6 @@ class PauseableTestFixture : public GeneratorTestFixture { std::shared_ptr toggle_; std::vector sink_; Future<> finished_; - }; INSTANTIATE_TEST_SUITE_P(PauseableTests, PauseableTestFixture, @@ -1434,7 +1424,8 @@ TEST_P(SequencerTestFixture, SequenceError) { TEST_P(SequencerTestFixture, Readahead) { AsyncGenerator original = MakeSource({4, 2, 0, 6}); util::TrackingGenerator tracker(original); - AsyncGenerator sequenced = MakeSequencingGenerator(static_cast>(tracker), cmp_, is_next_, TestInt(-2)); + AsyncGenerator sequenced = MakeSequencingGenerator( + static_cast>(tracker), cmp_, is_next_, TestInt(-2)); ASSERT_FINISHES_OK_AND_EQ(TestInt(0), sequenced()); ASSERT_EQ(3, tracker.num_read()); ASSERT_FINISHES_OK_AND_EQ(TestInt(2), sequenced()); diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index cbdfd38d509..38aba2f4353 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -193,7 +193,8 @@ bool AsyncToggle::IsOpen() { return when_open_.is_finished(); } -BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above) { +BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below, + uint32_t pause_if_above) { auto toggle = std::make_shared(); return BackpressureOptions{std::move(toggle), resume_if_below, pause_if_above}; } diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 9b9585d2cb8..f1f656b02d1 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -223,7 +223,7 @@ class ARROW_EXPORT AsyncToggle { /// \brief Options to control backpressure behavior struct BackpressureOptions { /// \brief Create default options that perform no backpressure - BackpressureOptions() : toggle(NULLPTR), resume_if_below(0), pause_if_above(0){}; + BackpressureOptions() : toggle(NULLPTR), resume_if_below(0), pause_if_above(0) {} /// \brief Create options that will perform backpressure /// /// \param toggle A toggle to be shared between the producer and consumer From cdb85b2f87846727f7569f73f5c0abcfc8bc5095 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 1 Oct 2021 01:50:31 -1000 Subject: [PATCH 03/13] ARROW-13611: Missing some ARROW_EXPORT flags --- cpp/src/arrow/util/async_util.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index f1f656b02d1..c71cae81bcf 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -221,7 +221,7 @@ class ARROW_EXPORT AsyncToggle { }; /// \brief Options to control backpressure behavior -struct BackpressureOptions { +struct ARROW_EXPORT BackpressureOptions { /// \brief Create default options that perform no backpressure BackpressureOptions() : toggle(NULLPTR), resume_if_below(0), pause_if_above(0) {} /// \brief Create options that will perform backpressure @@ -242,8 +242,8 @@ struct BackpressureOptions { uint32_t pause_if_above; }; -BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below = 32, - uint32_t pause_if_above = 64); +ARROW_EXPORT BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below = 32, + uint32_t pause_if_above = 64); } // namespace util } // namespace arrow From 0180c08a0f6ae45c25d12795a9dba86b53439b11 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 4 Oct 2021 13:39:28 -1000 Subject: [PATCH 04/13] ARROW-13611: Addressing PR comments. Fixed potential race condition where opening the backpressure while holding a lock. Added a unit test for the sink node --- cpp/src/arrow/compute/exec/CMakeLists.txt | 1 + cpp/src/arrow/compute/exec/sink_node_test.cc | 79 ++++++++++++++++++++ cpp/src/arrow/util/async_generator.h | 8 +- cpp/src/arrow/util/async_util.cc | 13 ++-- cpp/src/arrow/util/async_util.h | 4 + cpp/src/arrow/util/async_util_test.cc | 5 ++ 6 files changed, 99 insertions(+), 11 deletions(-) create mode 100644 cpp/src/arrow/compute/exec/sink_node_test.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index ccc36c093e8..0fccb05d957 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -26,6 +26,7 @@ add_arrow_compute_test(expression_test add_arrow_compute_test(plan_test PREFIX "arrow-compute") add_arrow_compute_test(hash_join_node_test PREFIX "arrow-compute") +add_arrow_compute_test(sink_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/sink_node_test.cc b/cpp/src/arrow/compute/exec/sink_node_test.cc new file mode 100644 index 00000000000..384a8302c4c --- /dev/null +++ b/cpp/src/arrow/compute/exec/sink_node_test.cc @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" + +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/options.h" +#include "arrow/util/async_generator.h" + +namespace arrow { +namespace compute { + +TEST(SinkNode, Backpressure) { + constexpr uint32_t kPauseIfAbove = 4; + constexpr uint32_t kResumeIfBelow = 2; + EXPECT_OK_AND_ASSIGN(std::shared_ptr plan, ExecPlan::Make()); + PushGenerator> batch_producer; + AsyncGenerator> sink_gen; + util::BackpressureOptions backpressure_options = + util::MakeBackpressureOptions(kResumeIfBelow, kPauseIfAbove); + std::shared_ptr schema_ = schema({field("data", uint32())}); + ARROW_EXPECT_OK(compute::Declaration::Sequence( + { + {"source", SourceNodeOptions(schema_, batch_producer)}, + {"sink", SinkNodeOptions{&sink_gen, backpressure_options}}, + }) + .AddToPlan(plan.get())); + ARROW_EXPECT_OK(plan->StartProducing()); + + EXPECT_OK_AND_ASSIGN(util::optional batch, ExecBatch::Make({MakeScalar(0)})); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // Should be able to push kPauseIfAbove batches without triggering back pressure + for (uint32_t i = 0; i < kPauseIfAbove; i++) { + batch_producer.producer().Push(batch); + } + SleepABit(); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // One more batch should trigger back pressure + batch_producer.producer().Push(batch); + BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); }); + ASSERT_FALSE(backpressure_options.toggle->IsOpen()); + + // Reading as much as we can while keeping it paused + for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) { + ASSERT_FINISHES_OK(sink_gen()); + } + SleepABit(); + ASSERT_FALSE(backpressure_options.toggle->IsOpen()); + + // Reading one more item should open up backpressure + ASSERT_FINISHES_OK(sink_gen()); + BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); }); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // Cleanup + batch_producer.producer().Push(IterationEnd>()); + plan->StopProducing(); + ASSERT_FINISHES_OK(plan->finished()); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 03fd7950ecd..964ade05d17 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -790,15 +790,13 @@ class PushGenerator { : backpressure(std::move(backpressure)) {} void OpenBackpressureIfFree() { - if (backpressure.toggle && !backpressure.toggle->IsOpen() && - result_q.size() < backpressure.resume_if_below) { + if (backpressure.toggle && result_q.size() < backpressure.resume_if_below) { backpressure.toggle->Open(); } } void CloseBackpressureIfFull() { - if (backpressure.toggle && backpressure.toggle->IsOpen() && - result_q.size() > backpressure.pause_if_above) { + if (backpressure.toggle && result_q.size() > backpressure.pause_if_above) { backpressure.toggle->Close(); } } @@ -898,6 +896,8 @@ class PushGenerator { if (!state_->result_q.empty()) { auto fut = Future::MakeFinished(std::move(state_->result_q.front())); state_->result_q.pop_front(); + // OpenBackpressureIfFree might trigger callbacks so release the lock first + lock.Unlock(); state_->OpenBackpressureIfFree(); return fut; } diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 38aba2f4353..f6d9ed3ad79 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -169,28 +169,27 @@ Future<> AsyncToggle::WhenOpen() { void AsyncToggle::Open() { util::Mutex::Guard guard = mutex_.Lock(); - if (when_open_.is_finished()) { + if (!closed_) { return; } - // This swap is needed to ensure we mark finished outside the lock. It does mean that - // later calls to Open might finish before earlier calls to Open but that should be ok. - Future<> to_finish = std::move(when_open_); - when_open_ = Future<>::MakeFinished(); + closed_ = false; + Future<> to_finish = when_open_; guard.Unlock(); to_finish.MarkFinished(); } void AsyncToggle::Close() { util::Mutex::Guard guard = mutex_.Lock(); - if (!when_open_.is_finished()) { + if (closed_) { return; } + closed_ = true; when_open_ = Future<>::Make(); } bool AsyncToggle::IsOpen() { util::Mutex::Guard guard = mutex_.Lock(); - return when_open_.is_finished(); + return !closed_; } BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below, diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index c71cae81bcf..b6ac38b7ecd 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -208,6 +208,9 @@ class ARROW_EXPORT AsyncToggle { void Close(); /// \brief Open the toggle /// + /// Note: This call may complete a future, triggering any callbacks, and generally + /// should not be done while holding any locks. + /// /// All current waiters will be released to enter, even if another close call /// quickly follows void Open(); @@ -217,6 +220,7 @@ class ARROW_EXPORT AsyncToggle { private: Future<> when_open_ = Future<>::MakeFinished(); + bool closed_ = false; util::Mutex mutex_; }; diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index eae4adfdfa1..c47fd0f79b6 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -19,6 +19,11 @@ #include +#include +#include +#include +#include + #include "arrow/result.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" From 63d61db8d7e8e152778f26d6ace860507513caa1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 4 Oct 2021 13:53:03 -1000 Subject: [PATCH 05/13] ARROW-13611: Clarified ordering behavior in comments --- cpp/src/arrow/util/async_generator.h | 5 ++++- cpp/src/arrow/util/async_util.h | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 964ade05d17..cccc27a5bc7 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1700,7 +1700,10 @@ struct PauseableGenerator { /// \brief Allows an async generator to be paused /// -/// This generator is async-reentrant if the source is +/// This generator is NOT async-reentrant and calling it in an async-reentrant fashion +/// may lead to items getting reordered (and potentially truncated if the end token is +/// reordered ahead of valid items) +/// /// This generator forwards async-reentrant pressure template AsyncGenerator MakePauseable(AsyncGenerator source, diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index b6ac38b7ecd..46b10ccdb1e 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -211,6 +211,9 @@ class ARROW_EXPORT AsyncToggle { /// Note: This call may complete a future, triggering any callbacks, and generally /// should not be done while holding any locks. /// + /// Note: If Open is called from multiple threads it could lead to a situation where + /// callbacks from the second open finish before callbacks on the first open. + /// /// All current waiters will be released to enter, even if another close call /// quickly follows void Open(); From ea34d174113def231329048e5ba559857851fcc5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 4 Oct 2021 14:32:37 -1000 Subject: [PATCH 06/13] ARROW-13611: Cleaned up the backpressure factory functions a little for simplicity and making it more obvious what is happening. Added backpressure to the OrderBy sink node --- cpp/src/arrow/compute/exec/sink_node.cc | 12 ++++++++---- cpp/src/arrow/compute/exec/sink_node_test.cc | 2 +- cpp/src/arrow/dataset/scanner.cc | 2 +- cpp/src/arrow/util/async_util.cc | 8 ++++++-- cpp/src/arrow/util/async_util.h | 8 +++++--- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index b1238a14bbb..1bb2680383c 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -237,8 +237,10 @@ class ConsumingSinkNode : public ExecNode { struct OrderBySinkNode final : public SinkNode { OrderBySinkNode(ExecPlan* plan, std::vector inputs, std::unique_ptr impl, - AsyncGenerator>* generator) - : SinkNode(plan, std::move(inputs), generator, {}), impl_{std::move(impl)} {} + AsyncGenerator>* generator, + util::BackpressureOptions backpressure) + : SinkNode(plan, std::move(inputs), generator, std::move(backpressure)), + impl_{std::move(impl)} {} const char* kind_name() const override { return "OrderBySinkNode"; } @@ -253,7 +255,8 @@ struct OrderBySinkNode final : public SinkNode { OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(), sink_options.sort_options)); return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), - sink_options.generator); + sink_options.generator, + sink_options.backpressure); } // A sink node that receives inputs and then compute top_k/bottom_k. @@ -267,7 +270,8 @@ struct OrderBySinkNode final : public SinkNode { OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(), sink_options.select_k_options)); return plan->EmplaceNode(plan, std::move(inputs), std::move(impl), - sink_options.generator); + sink_options.generator, + sink_options.backpressure); } void InputReceived(ExecNode* input, ExecBatch batch) override { diff --git a/cpp/src/arrow/compute/exec/sink_node_test.cc b/cpp/src/arrow/compute/exec/sink_node_test.cc index 384a8302c4c..7d45ffc86d1 100644 --- a/cpp/src/arrow/compute/exec/sink_node_test.cc +++ b/cpp/src/arrow/compute/exec/sink_node_test.cc @@ -32,7 +32,7 @@ TEST(SinkNode, Backpressure) { PushGenerator> batch_producer; AsyncGenerator> sink_gen; util::BackpressureOptions backpressure_options = - util::MakeBackpressureOptions(kResumeIfBelow, kPauseIfAbove); + util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove); std::shared_ptr schema_ = schema({field("data", uint32())}); ARROW_EXPECT_OK(compute::Declaration::Sequence( { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 6650e46041c..81dc3e55072 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -599,7 +599,7 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( AsyncGenerator> sink_gen; util::BackpressureOptions backpressure = - util::MakeBackpressureOptions(kDefaultBackpressureLow, kDefaultBackpressureHigh); + util::BackpressureOptions::Make(kDefaultBackpressureLow, kDefaultBackpressureHigh); auto exprs = scan_options_->projection.call()->arguments; auto names = checked_cast( scan_options_->projection.call()->options.get()) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index f6d9ed3ad79..f5b9bdcbe6c 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -192,11 +192,15 @@ bool AsyncToggle::IsOpen() { return !closed_; } -BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below, - uint32_t pause_if_above) { +BackpressureOptions BackpressureOptions::Make(uint32_t resume_if_below, + uint32_t pause_if_above) { auto toggle = std::make_shared(); return BackpressureOptions{std::move(toggle), resume_if_below, pause_if_above}; } +BackpressureOptions BackpressureOptions::NoBackpressure() { + return BackpressureOptions(); +} + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 46b10ccdb1e..29b21683099 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -244,13 +244,15 @@ struct ARROW_EXPORT BackpressureOptions { resume_if_below(resume_if_below), pause_if_above(pause_if_above) {} + static BackpressureOptions Make(uint32_t resume_if_below = 32, + uint32_t pause_if_above = 64); + + static BackpressureOptions NoBackpressure(); + std::shared_ptr toggle; uint32_t resume_if_below; uint32_t pause_if_above; }; -ARROW_EXPORT BackpressureOptions MakeBackpressureOptions(uint32_t resume_if_below = 32, - uint32_t pause_if_above = 64); - } // namespace util } // namespace arrow From 4a4cc8291a9fea2e99f45b3f2143919570963bab Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 10:34:28 -1000 Subject: [PATCH 07/13] ARROW-13611: Added ThreadPool::WaitForIdle and used it to make the backpressure tests more predictable. In addition, my previous attempt at fixing the race condition between the producer and the async toggle wasn't correct. I improved it and now it should be more accurate. --- cpp/src/arrow/dataset/scanner.h | 6 +++++ cpp/src/arrow/dataset/scanner_test.cc | 34 +++++++++++++++++++++------ cpp/src/arrow/util/async_generator.h | 12 +++++----- cpp/src/arrow/util/thread_pool.cc | 12 +++++++++- cpp/src/arrow/util/thread_pool.h | 3 +++ 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 78746068d87..27d6f0e5e1c 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions { /// Fragment-specific scan options. std::shared_ptr fragment_scan_options; + /// Callback which will be run whenever the scanner pauses due to backpressure + /// + /// This is mostly for debugging & tracing so that the consumer can be notified if + /// they are not consuming data quickly enough. + std::function on_paused_callback; + // Return a vector of fields that requires materialization. // // This is usually the union of the fields referenced in the projection and the diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 4120014ea4a..b9e281ba297 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -39,6 +39,7 @@ #include "arrow/testing/matchers.h" #include "arrow/testing/util.h" #include "arrow/util/range.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/vector.h" using testing::ElementsAre; @@ -972,10 +973,10 @@ TEST_F(TestReordering, ScanBatchesUnordered) { class TestBackpressure : public ::testing::Test { protected: static constexpr int NFRAGMENTS = 10; - static constexpr int NBATCHES = 1000; + static constexpr int NBATCHES = 10; static constexpr int NROWS = 10; - FragmentVector MakeFragments() { + FragmentVector MakeFragmentsAndDeliverInitialBatches() { FragmentVector fragments; for (int i = 0; i < NFRAGMENTS; i++) { controlled_fragments_.emplace_back(std::make_shared(schema_)); @@ -993,8 +994,17 @@ class TestBackpressure : public ::testing::Test { return fragments; } + void DeliverAdditionalBatches() { + // Deliver a bunch of batches that should not be read in + for (int i = 1; i < NFRAGMENTS; i++) { + for (int j = 0; j < NBATCHES; j++) { + controlled_fragments_[i]->DeliverBatch(NROWS); + } + } + } + std::shared_ptr MakeDataset() { - FragmentVector fragments = MakeFragments(); + FragmentVector fragments = MakeFragmentsAndDeliverInitialBatches(); return std::make_shared(schema_, std::move(fragments)); } @@ -1026,13 +1036,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesUnorderedAsync()); ASSERT_FINISHES_OK(gen()); + // Wait for the thread pool to idle. By this point the scanner should have paused itself + // This helps with timing on slower CI systems where there is only one core and the scanner + // might keep that core until it has scanned all the batches which never gives the sink a + // chance to report it is falling behind. + GetCpuThreadPool()->WaitForIdle(); + DeliverAdditionalBatches(); // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; }); + ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh); SleepABit(); - ASSERT_LT(TotalBatchesRead(), 2 * kDefaultBackpressureHigh); + // Worst case we read in the entire set of initial batches + ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); } TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { @@ -1044,16 +1061,19 @@ TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { // we don't need it to finish. Future fut = gen(); + // See note on other test + GetCpuThreadPool()->WaitForIdle(); + // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; }); + ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh); // This can yield some false passes but it is tricky to test that a counter doesn't // increase over time. for (int i = 0; i < 20; i++) { SleepABit(); } - ASSERT_LT(TotalBatchesRead(), 2 * kDefaultBackpressureHigh); + ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); } struct BatchConsumer { diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index cccc27a5bc7..d2a2339f5bc 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -789,13 +789,15 @@ class PushGenerator { explicit State(util::BackpressureOptions backpressure) : backpressure(std::move(backpressure)) {} - void OpenBackpressureIfFree() { + void OpenBackpressureIfFreeUnlocked(util::Mutex::Guard&& guard) { if (backpressure.toggle && result_q.size() < backpressure.resume_if_below) { + // Open might trigger callbacks so release the lock first + guard.Unlock(); backpressure.toggle->Open(); } } - void CloseBackpressureIfFull() { + void CloseBackpressureIfFullUnlocked() { if (backpressure.toggle && result_q.size() > backpressure.pause_if_above) { backpressure.toggle->Close(); } @@ -837,7 +839,7 @@ class PushGenerator { fut.MarkFinished(std::move(result)); } else { state->result_q.push_back(std::move(result)); - state->CloseBackpressureIfFull(); + state->CloseBackpressureIfFullUnlocked(); } return true; } @@ -896,9 +898,7 @@ class PushGenerator { if (!state_->result_q.empty()) { auto fut = Future::MakeFinished(std::move(state_->result_q.front())); state_->result_q.pop_front(); - // OpenBackpressureIfFree might trigger callbacks so release the lock first - lock.Unlock(); - state_->OpenBackpressureIfFree(); + state_->OpenBackpressureIfFreeUnlocked(std::move(lock)); return fut; } if (state_->finished) { diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 758295d01ed..823152702b8 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -121,6 +121,7 @@ struct ThreadPool::State { std::mutex mutex_; std::condition_variable cv_; std::condition_variable cv_shutdown_; + std::condition_variable cv_idle_; std::list workers_; // Trashcan for finished threads @@ -182,7 +183,9 @@ static void WorkerLoop(std::shared_ptr state, ARROW_UNUSED(std::move(task)); // release resources before waiting for lock lock.lock(); } - state->tasks_queued_or_running_--; + if ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) { + state->cv_idle_.notify_all(); + } } // Now either the queue is empty *or* a quick shutdown was requested if (state->please_shutdown_ || should_secede()) { @@ -209,6 +212,13 @@ static void WorkerLoop(std::shared_ptr state, } } +void ThreadPool::WaitForIdle() { + std::unique_lock lk(state_->mutex_); + state_->cv_idle_.wait(lk, [this] { + return state_->tasks_queued_or_running_ == 0; + }); +} + ThreadPool::ThreadPool() : sp_state_(std::make_shared()), state_(sp_state_.get()), diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 9ac8e36a3d8..b0f39fe9ce6 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -341,6 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor { // tasks are finished. Status Shutdown(bool wait = true); + + void WaitForIdle(); + struct State; protected: From 71d4c4244fc71f6d3cab9eacb862e72fd6af2b1a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 11:02:28 -1000 Subject: [PATCH 08/13] ARROW-13611: General cleanup of the PR --- cpp/src/arrow/compute/exec/CMakeLists.txt | 1 - cpp/src/arrow/compute/exec/plan_test.cc | 50 +++++++++++++ cpp/src/arrow/compute/exec/sink_node_test.cc | 79 -------------------- cpp/src/arrow/dataset/scanner.h | 6 -- cpp/src/arrow/util/async_util_test.cc | 5 -- cpp/src/arrow/util/thread_pool.h | 4 +- 6 files changed, 53 insertions(+), 92 deletions(-) delete mode 100644 cpp/src/arrow/compute/exec/sink_node_test.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 0fccb05d957..ccc36c093e8 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -26,7 +26,6 @@ add_arrow_compute_test(expression_test add_arrow_compute_test(plan_test PREFIX "arrow-compute") add_arrow_compute_test(hash_join_node_test PREFIX "arrow-compute") -add_arrow_compute_test(sink_node_test PREFIX "arrow-compute") add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index ac81ed93904..c4ec36490f1 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -237,6 +237,56 @@ TEST(ExecPlanExecution, SourceSink) { } } +TEST(ExecPlanExecution, SinkNodeBackpressure) { + constexpr uint32_t kPauseIfAbove = 4; + constexpr uint32_t kResumeIfBelow = 2; + EXPECT_OK_AND_ASSIGN(std::shared_ptr plan, ExecPlan::Make()); + PushGenerator> batch_producer; + AsyncGenerator> sink_gen; + util::BackpressureOptions backpressure_options = + util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove); + std::shared_ptr schema_ = schema({field("data", uint32())}); + ARROW_EXPECT_OK(compute::Declaration::Sequence( + { + {"source", SourceNodeOptions(schema_, batch_producer)}, + {"sink", SinkNodeOptions{&sink_gen, backpressure_options}}, + }) + .AddToPlan(plan.get())); + ARROW_EXPECT_OK(plan->StartProducing()); + + EXPECT_OK_AND_ASSIGN(util::optional batch, ExecBatch::Make({MakeScalar(0)})); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // Should be able to push kPauseIfAbove batches without triggering back pressure + for (uint32_t i = 0; i < kPauseIfAbove; i++) { + batch_producer.producer().Push(batch); + } + SleepABit(); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // One more batch should trigger back pressure + batch_producer.producer().Push(batch); + BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); }); + ASSERT_FALSE(backpressure_options.toggle->IsOpen()); + + // Reading as much as we can while keeping it paused + for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) { + ASSERT_FINISHES_OK(sink_gen()); + } + SleepABit(); + ASSERT_FALSE(backpressure_options.toggle->IsOpen()); + + // Reading one more item should open up backpressure + ASSERT_FINISHES_OK(sink_gen()); + BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); }); + ASSERT_TRUE(backpressure_options.toggle->IsOpen()); + + // Cleanup + batch_producer.producer().Push(IterationEnd>()); + plan->StopProducing(); + ASSERT_FINISHES_OK(plan->finished()); +} + TEST(ExecPlan, ToString) { auto basic_data = MakeBasicBatches(); AsyncGenerator> sink_gen; diff --git a/cpp/src/arrow/compute/exec/sink_node_test.cc b/cpp/src/arrow/compute/exec/sink_node_test.cc deleted file mode 100644 index 7d45ffc86d1..00000000000 --- a/cpp/src/arrow/compute/exec/sink_node_test.cc +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/testing/future_util.h" -#include "arrow/testing/gtest_util.h" - -#include "arrow/compute/exec/exec_plan.h" -#include "arrow/compute/exec/options.h" -#include "arrow/util/async_generator.h" - -namespace arrow { -namespace compute { - -TEST(SinkNode, Backpressure) { - constexpr uint32_t kPauseIfAbove = 4; - constexpr uint32_t kResumeIfBelow = 2; - EXPECT_OK_AND_ASSIGN(std::shared_ptr plan, ExecPlan::Make()); - PushGenerator> batch_producer; - AsyncGenerator> sink_gen; - util::BackpressureOptions backpressure_options = - util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove); - std::shared_ptr schema_ = schema({field("data", uint32())}); - ARROW_EXPECT_OK(compute::Declaration::Sequence( - { - {"source", SourceNodeOptions(schema_, batch_producer)}, - {"sink", SinkNodeOptions{&sink_gen, backpressure_options}}, - }) - .AddToPlan(plan.get())); - ARROW_EXPECT_OK(plan->StartProducing()); - - EXPECT_OK_AND_ASSIGN(util::optional batch, ExecBatch::Make({MakeScalar(0)})); - ASSERT_TRUE(backpressure_options.toggle->IsOpen()); - - // Should be able to push kPauseIfAbove batches without triggering back pressure - for (uint32_t i = 0; i < kPauseIfAbove; i++) { - batch_producer.producer().Push(batch); - } - SleepABit(); - ASSERT_TRUE(backpressure_options.toggle->IsOpen()); - - // One more batch should trigger back pressure - batch_producer.producer().Push(batch); - BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); }); - ASSERT_FALSE(backpressure_options.toggle->IsOpen()); - - // Reading as much as we can while keeping it paused - for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) { - ASSERT_FINISHES_OK(sink_gen()); - } - SleepABit(); - ASSERT_FALSE(backpressure_options.toggle->IsOpen()); - - // Reading one more item should open up backpressure - ASSERT_FINISHES_OK(sink_gen()); - BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); }); - ASSERT_TRUE(backpressure_options.toggle->IsOpen()); - - // Cleanup - batch_producer.producer().Push(IterationEnd>()); - plan->StopProducing(); - ASSERT_FINISHES_OK(plan->finished()); -} - -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 27d6f0e5e1c..78746068d87 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -123,12 +123,6 @@ struct ARROW_DS_EXPORT ScanOptions { /// Fragment-specific scan options. std::shared_ptr fragment_scan_options; - /// Callback which will be run whenever the scanner pauses due to backpressure - /// - /// This is mostly for debugging & tracing so that the consumer can be notified if - /// they are not consuming data quickly enough. - std::function on_paused_callback; - // Return a vector of fields that requires materialization. // // This is usually the union of the fields referenced in the projection and the diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index c47fd0f79b6..eae4adfdfa1 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -19,11 +19,6 @@ #include -#include -#include -#include -#include - #include "arrow/result.h" #include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index b0f39fe9ce6..4ed908d6f29 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -341,7 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor { // tasks are finished. Status Shutdown(bool wait = true); - + // Wait for the thread pool to become idle + // + // This is useful for sequencing tests void WaitForIdle(); struct State; From d898138b4a0e512251f4f0eda324b1ac52423554 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 11:04:14 -1000 Subject: [PATCH 09/13] ARROW-13611: Moved from ASSERT_TRUE to ASSERT_GE for more explicit assert --- cpp/src/arrow/dataset/scanner_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b9e281ba297..87ac23a78ca 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1046,7 +1046,7 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh); + ASSERT_GE(TotalBatchesRead(), kDefaultBackpressureHigh); SleepABit(); // Worst case we read in the entire set of initial batches ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); @@ -1067,7 +1067,7 @@ TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. - ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh); + ASSERT_GE(TotalBatchesRead(), kDefaultBackpressureHigh); // This can yield some false passes but it is tricky to test that a counter doesn't // increase over time. for (int i = 0; i < 20; i++) { From 8770a08a0d680190773ef575c456e5a05cc3f1eb Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 13:05:08 -1000 Subject: [PATCH 10/13] ARROW-13611: Enhanced the backpressure test a bit more. Ran lint/format. Removed disabled ordered backpressure test because I was just updating the same thing twice. It can be added back in when ordering is supported --- cpp/src/arrow/dataset/scanner_test.cc | 41 ++++++--------------------- cpp/src/arrow/util/thread_pool.cc | 11 ++++--- 2 files changed, 14 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 87ac23a78ca..63eeca008f9 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -973,7 +973,7 @@ TEST_F(TestReordering, ScanBatchesUnordered) { class TestBackpressure : public ::testing::Test { protected: static constexpr int NFRAGMENTS = 10; - static constexpr int NBATCHES = 10; + static constexpr int NBATCHES = 50; static constexpr int NROWS = 10; FragmentVector MakeFragmentsAndDeliverInitialBatches() { @@ -1036,43 +1036,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, scanner->ScanBatchesUnorderedAsync()); ASSERT_FINISHES_OK(gen()); - // Wait for the thread pool to idle. By this point the scanner should have paused itself - // This helps with timing on slower CI systems where there is only one core and the scanner - // might keep that core until it has scanned all the batches which never gives the sink a - // chance to report it is falling behind. - GetCpuThreadPool()->WaitForIdle(); - DeliverAdditionalBatches(); - // The exact numbers may be imprecise due to threading but we should pretty quickly read // up to our backpressure limit and a little above. We should not be able to go too far // above. + BusyWait(30, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; }); ASSERT_GE(TotalBatchesRead(), kDefaultBackpressureHigh); - SleepABit(); - // Worst case we read in the entire set of initial batches - ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); -} - -TEST_F(TestBackpressure, DISABLED_ScanBatchesOrdered) { - std::shared_ptr scanner = MakeScanner(); - EXPECT_OK_AND_ASSIGN(AsyncGenerator gen, - scanner->ScanBatchesAsync()); - // This future never actually finishes because we only emit the first batch so far and - // the scanner delays by one batch. It is enough to start the system pumping though so - // we don't need it to finish. - Future fut = gen(); - - // See note on other test + // Wait for the thread pool to idle. By this point the scanner should have paused + // itself This helps with timing on slower CI systems where there is only one core and + // the scanner might keep that core until it has scanned all the batches which never + // gives the sink a chance to report it is falling behind. GetCpuThreadPool()->WaitForIdle(); + DeliverAdditionalBatches(); - // The exact numbers may be imprecise due to threading but we should pretty quickly read - // up to our backpressure limit and a little above. We should not be able to go too far - // above. - ASSERT_GE(TotalBatchesRead(), kDefaultBackpressureHigh); - // This can yield some false passes but it is tricky to test that a counter doesn't - // increase over time. - for (int i = 0; i < 20; i++) { - SleepABit(); - } + SleepABit(); + // Worst case we read in the entire set of initial batches ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); } diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 823152702b8..e90e71bda45 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -183,9 +183,10 @@ static void WorkerLoop(std::shared_ptr state, ARROW_UNUSED(std::move(task)); // release resources before waiting for lock lock.lock(); } - if ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) { - state->cv_idle_.notify_all(); - } + if + ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) { + state->cv_idle_.notify_all(); + } } // Now either the queue is empty *or* a quick shutdown was requested if (state->please_shutdown_ || should_secede()) { @@ -214,9 +215,7 @@ static void WorkerLoop(std::shared_ptr state, void ThreadPool::WaitForIdle() { std::unique_lock lk(state_->mutex_); - state_->cv_idle_.wait(lk, [this] { - return state_->tasks_queued_or_running_ == 0; - }); + state_->cv_idle_.wait(lk, [this] { return state_->tasks_queued_or_running_ == 0; }); } ThreadPool::ThreadPool() From b9621f8e6b33db7f499abac2513906ee02e59502 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 14:28:45 -1000 Subject: [PATCH 11/13] ARROW-13611: Fix memleak --- cpp/src/arrow/util/async_generator_test.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 297cc59233e..7709602e7f4 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -211,7 +211,11 @@ ReentrantCheckerGuard ExpectNotAccessedReentrantly(AsyncGenerator* generat } class GeneratorTestFixture : public ::testing::TestWithParam { + public: + virtual ~GeneratorTestFixture() override = default; + protected: + AsyncGenerator MakeSource(const std::vector& items) { std::vector wrapped(items.begin(), items.end()); auto gen = AsyncVectorIt(std::move(wrapped)); @@ -1242,6 +1246,11 @@ INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture, ::testing::Values(false, true)); class PauseableTestFixture : public GeneratorTestFixture { + public: + ~PauseableTestFixture() override { + generator_.producer().Close(); + } + protected: PauseableTestFixture() : toggle_(std::make_shared()) { sink_.clear(); From 9ff35754c51b456e9163da8941af80654d5e8d7a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 6 Oct 2021 16:33:31 -1000 Subject: [PATCH 12/13] ARROW-13611: Lint and another memleak fix. Turns out using PushGenerator as the source of an AsyncGenerator pipeline can be somewhat problematic. if the producer is not finished. --- cpp/src/arrow/dataset/scanner_test.cc | 10 ++++++++++ cpp/src/arrow/util/async_generator_test.cc | 7 ++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 63eeca008f9..40a0e005a3f 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1027,6 +1027,14 @@ class TestBackpressure : public ::testing::Test { return sum; } + void Finish(AsyncGenerator gen) { + for (const auto& controlled_fragment : controlled_fragments_) { + controlled_fragment->Finish(); + } + ASSERT_FINISHES_OK(VisitAsyncGenerator( + gen, [](EnumeratedRecordBatch batch) { return Status::OK(); })); + } + std::shared_ptr schema_ = schema({field("values", int32())}); std::vector> controlled_fragments_; }; @@ -1051,6 +1059,8 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) { SleepABit(); // Worst case we read in the entire set of initial batches ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1); + + Finish(std::move(gen)); } struct BatchConsumer { diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 7709602e7f4..22f55d5cb20 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -212,10 +212,9 @@ ReentrantCheckerGuard ExpectNotAccessedReentrantly(AsyncGenerator* generat class GeneratorTestFixture : public ::testing::TestWithParam { public: - virtual ~GeneratorTestFixture() override = default; + ~GeneratorTestFixture() override = default; protected: - AsyncGenerator MakeSource(const std::vector& items) { std::vector wrapped(items.begin(), items.end()); auto gen = AsyncVectorIt(std::move(wrapped)); @@ -1247,9 +1246,7 @@ INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture, class PauseableTestFixture : public GeneratorTestFixture { public: - ~PauseableTestFixture() override { - generator_.producer().Close(); - } + ~PauseableTestFixture() override { generator_.producer().Close(); } protected: PauseableTestFixture() : toggle_(std::make_shared()) { From 401e9060b755cccf3b5ee62ba56507a9a7d42eec Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 8 Oct 2021 16:17:31 -1000 Subject: [PATCH 13/13] ARROW-13611: formatting --- cpp/src/arrow/util/thread_pool.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index e90e71bda45..37132fe1a9c 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -183,10 +183,9 @@ static void WorkerLoop(std::shared_ptr state, ARROW_UNUSED(std::move(task)); // release resources before waiting for lock lock.lock(); } - if - ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) { - state->cv_idle_.notify_all(); - } + if (ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0)) { + state->cv_idle_.notify_all(); + } } // Now either the queue is empty *or* a quick shutdown was requested if (state->please_shutdown_ || should_secede()) {