Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/util/async_util.h"
#include "arrow/util/optional.h"
#include "arrow/util/visibility.h"

Expand Down Expand Up @@ -110,10 +111,12 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
/// Emitted batches will not be ordered.
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator)
: generator(generator) {}
explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
util::BackpressureOptions backpressure = {})
: generator(generator), backpressure(std::move(backpressure)) {}

std::function<Future<util::optional<ExecBatch>>()>* generator;
util::BackpressureOptions backpressure;
};

class ARROW_EXPORT SinkNodeConsumer {
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,56 @@ TEST(ExecPlanExecution, SourceSink) {
}
}

TEST(ExecPlanExecution, SinkNodeBackpressure) {
constexpr uint32_t kPauseIfAbove = 4;
constexpr uint32_t kResumeIfBelow = 2;
EXPECT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> plan, ExecPlan::Make());
PushGenerator<util::optional<ExecBatch>> batch_producer;
AsyncGenerator<util::optional<ExecBatch>> sink_gen;
util::BackpressureOptions backpressure_options =
util::BackpressureOptions::Make(kResumeIfBelow, kPauseIfAbove);
std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
ARROW_EXPECT_OK(compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, backpressure_options}},
})
.AddToPlan(plan.get()));
ARROW_EXPECT_OK(plan->StartProducing());

EXPECT_OK_AND_ASSIGN(util::optional<ExecBatch> batch, ExecBatch::Make({MakeScalar(0)}));
ASSERT_TRUE(backpressure_options.toggle->IsOpen());

// Should be able to push kPauseIfAbove batches without triggering back pressure
for (uint32_t i = 0; i < kPauseIfAbove; i++) {
batch_producer.producer().Push(batch);
}
SleepABit();
ASSERT_TRUE(backpressure_options.toggle->IsOpen());

// One more batch should trigger back pressure
batch_producer.producer().Push(batch);
BusyWait(10, [&] { return !backpressure_options.toggle->IsOpen(); });
ASSERT_FALSE(backpressure_options.toggle->IsOpen());

// Reading as much as we can while keeping it paused
for (uint32_t i = kPauseIfAbove; i >= kResumeIfBelow; i--) {
ASSERT_FINISHES_OK(sink_gen());
}
SleepABit();
ASSERT_FALSE(backpressure_options.toggle->IsOpen());

// Reading one more item should open up backpressure
ASSERT_FINISHES_OK(sink_gen());
BusyWait(10, [&] { return backpressure_options.toggle->IsOpen(); });
ASSERT_TRUE(backpressure_options.toggle->IsOpen());

// Cleanup
batch_producer.producer().Push(IterationEnd<util::optional<ExecBatch>>());
plan->StopProducing();
ASSERT_FINISHES_OK(plan->finished());
}

TEST(ExecPlan, ToString) {
auto basic_data = MakeBasicBatches();
AsyncGenerator<util::optional<ExecBatch>> sink_gen;
Expand Down
25 changes: 16 additions & 9 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ namespace {
class SinkNode : public ExecNode {
public:
SinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
AsyncGenerator<util::optional<ExecBatch>>* generator)
AsyncGenerator<util::optional<ExecBatch>>* generator,
util::BackpressureOptions backpressure)
: ExecNode(plan, std::move(inputs), {"collected"}, {},
/*num_outputs=*/0),
producer_(MakeProducer(generator)) {}
producer_(MakeProducer(generator, std::move(backpressure))) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "SinkNode"));

const auto& sink_options = checked_cast<const SinkNodeOptions&>(options);
return plan->EmplaceNode<SinkNode>(plan, std::move(inputs), sink_options.generator);
return plan->EmplaceNode<SinkNode>(plan, std::move(inputs), sink_options.generator,
sink_options.backpressure);
}

static PushGenerator<util::optional<ExecBatch>>::Producer MakeProducer(
AsyncGenerator<util::optional<ExecBatch>>* out_gen) {
PushGenerator<util::optional<ExecBatch>> push_gen;
AsyncGenerator<util::optional<ExecBatch>>* out_gen,
util::BackpressureOptions backpressure) {
PushGenerator<util::optional<ExecBatch>> push_gen(std::move(backpressure));
auto out = push_gen.producer();
*out_gen = std::move(push_gen);
return out;
Expand Down Expand Up @@ -234,8 +237,10 @@ class ConsumingSinkNode : public ExecNode {
struct OrderBySinkNode final : public SinkNode {
OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<util::optional<ExecBatch>>* generator)
: SinkNode(plan, std::move(inputs), generator), impl_{std::move(impl)} {}
AsyncGenerator<util::optional<ExecBatch>>* generator,
util::BackpressureOptions backpressure)
: SinkNode(plan, std::move(inputs), generator, std::move(backpressure)),
impl_{std::move(impl)} {}

const char* kind_name() const override { return "OrderBySinkNode"; }

Expand All @@ -250,7 +255,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
sink_options.sort_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.backpressure);
}

// A sink node that receives inputs and then compute top_k/bottom_k.
Expand All @@ -264,7 +270,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(),
sink_options.select_k_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.backpressure);
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
Expand Down
26 changes: 17 additions & 9 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -598,20 +598,23 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(exec_context.get()));
AsyncGenerator<util::optional<compute::ExecBatch>> sink_gen;

util::BackpressureOptions backpressure =
util::BackpressureOptions::Make(kDefaultBackpressureLow, kDefaultBackpressureHigh);
auto exprs = scan_options_->projection.call()->arguments;
auto names = checked_cast<const compute::MakeStructOptions*>(
scan_options_->projection.call()->options.get())
->field_names;

RETURN_NOT_OK(compute::Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset_, scan_options_}},
{"filter", compute::FilterNodeOptions{scan_options_->filter}},
{"augmented_project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"sink", compute::SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));
RETURN_NOT_OK(
compute::Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}},
{"filter", compute::FilterNodeOptions{scan_options_->filter}},
{"augmented_project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"sink", compute::SinkNodeOptions{&sink_gen, std::move(backpressure)}},
})
.AddToPlan(plan.get()));

RETURN_NOT_OK(plan->StartProducing());

Expand Down Expand Up @@ -1139,6 +1142,7 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
const auto& scan_node_options = checked_cast<const ScanNodeOptions&>(options);
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
const auto& backpressure_toggle = scan_node_options.backpressure_toggle;

if (!scan_options->use_async) {
return Status::NotImplemented("ScanNodes without asynchrony");
Expand Down Expand Up @@ -1201,6 +1205,10 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
return batch;
});

if (backpressure_toggle) {
gen = MakePauseable(gen, backpressure_toggle);
}

auto fields = scan_options->dataset_schema->fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace dataset {
constexpr int64_t kDefaultBatchSize = 1 << 20;
constexpr int32_t kDefaultBatchReadahead = 32;
constexpr int32_t kDefaultFragmentReadahead = 8;
constexpr int32_t kDefaultBackpressureHigh = 64;
constexpr int32_t kDefaultBackpressureLow = 32;

/// Scan-specific options, which can be changed between scans of the same dataset.
struct ARROW_DS_EXPORT ScanOptions {
Expand Down Expand Up @@ -417,12 +419,16 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// ordering for simple ExecPlans.
class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options)
: dataset(std::move(dataset)), scan_options(std::move(scan_options)) {}
explicit ScanNodeOptions(
std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<util::AsyncToggle> backpressure_toggle = NULLPTR)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
backpressure_toggle(std::move(backpressure_toggle)) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
std::shared_ptr<util::AsyncToggle> backpressure_toggle;
};

/// @}
Expand Down
104 changes: 102 additions & 2 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
#include "arrow/dataset/test_util.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/async_test_util.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/util.h"
#include "arrow/util/range.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"

using testing::ElementsAre;
Expand Down Expand Up @@ -740,7 +742,9 @@ INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
class ControlledFragment : public Fragment {
public:
explicit ControlledFragment(std::shared_ptr<Schema> schema)
: Fragment(literal(true), std::move(schema)) {}
: Fragment(literal(true), std::move(schema)),
record_batch_generator_(),
tracking_generator_(record_batch_generator_) {}

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override {
return Status::NotImplemented(
Expand All @@ -753,9 +757,11 @@ class ControlledFragment : public Fragment {

Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override {
return record_batch_generator_;
return tracking_generator_;
};

int NumBatchesRead() { return tracking_generator_.num_read(); }

void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
void DeliverBatch(uint32_t num_rows) {
auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
Expand All @@ -764,6 +770,7 @@ class ControlledFragment : public Fragment {

private:
PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
util::TrackingGenerator<std::shared_ptr<RecordBatch>> tracking_generator_;
};

// TODO(ARROW-8163) Add testing for fragments arriving out of order
Expand Down Expand Up @@ -963,6 +970,99 @@ TEST_F(TestReordering, ScanBatchesUnordered) {
AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
}

class TestBackpressure : public ::testing::Test {
protected:
static constexpr int NFRAGMENTS = 10;
static constexpr int NBATCHES = 50;
static constexpr int NROWS = 10;

FragmentVector MakeFragmentsAndDeliverInitialBatches() {
FragmentVector fragments;
for (int i = 0; i < NFRAGMENTS; i++) {
controlled_fragments_.emplace_back(std::make_shared<ControlledFragment>(schema_));
fragments.push_back(controlled_fragments_[i]);
// We only emit one batch on the first fragment. This triggers the sequencing
// generator to dig really deep to try and find the second batch
int num_to_emit = NBATCHES;
if (i == 0) {
num_to_emit = 1;
}
for (int j = 0; j < num_to_emit; j++) {
controlled_fragments_[i]->DeliverBatch(NROWS);
}
}
return fragments;
}

void DeliverAdditionalBatches() {
// Deliver a bunch of batches that should not be read in
for (int i = 1; i < NFRAGMENTS; i++) {
for (int j = 0; j < NBATCHES; j++) {
controlled_fragments_[i]->DeliverBatch(NROWS);
}
}
}

std::shared_ptr<Dataset> MakeDataset() {
FragmentVector fragments = MakeFragmentsAndDeliverInitialBatches();
return std::make_shared<FragmentDataset>(schema_, std::move(fragments));
}

std::shared_ptr<Scanner> MakeScanner() {
std::shared_ptr<Dataset> dataset = MakeDataset();
std::shared_ptr<ScanOptions> options = std::make_shared<ScanOptions>();
ScannerBuilder builder(std::move(dataset), options);
ARROW_EXPECT_OK(builder.UseThreads(true));
ARROW_EXPECT_OK(builder.UseAsync(true));
ARROW_EXPECT_OK(builder.FragmentReadahead(4));
EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
return scanner;
}

int TotalBatchesRead() {
int sum = 0;
for (const auto& controlled_fragment : controlled_fragments_) {
sum += controlled_fragment->NumBatchesRead();
}
return sum;
}

void Finish(AsyncGenerator<EnumeratedRecordBatch> gen) {
for (const auto& controlled_fragment : controlled_fragments_) {
controlled_fragment->Finish();
}
ASSERT_FINISHES_OK(VisitAsyncGenerator(
gen, [](EnumeratedRecordBatch batch) { return Status::OK(); }));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I would expect dropping the generator/scanner to clean up everything. Wonder what's keeping it alive. (Not a problem to solve here.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minimal example of the problem is:

TEST(Weston, MemTest) {
  PushGenerator<util::optional<int>> int_prod;
  AsyncGenerator<util::optional<int>> int_gen = int_prod;
  Future<> visit_fut = VisitAsyncGenerator(std::move(int_gen), [] (util::optional<int>) {
    return Status::OK();
  });
}

The above will leak. I have a fix in mind but will address it in a separate PR.

std::shared_ptr<Schema> schema_ = schema({field("values", int32())});
std::vector<std::shared_ptr<ControlledFragment>> controlled_fragments_;
};

TEST_F(TestBackpressure, ScanBatchesUnordered) {
std::shared_ptr<Scanner> scanner = MakeScanner();
EXPECT_OK_AND_ASSIGN(AsyncGenerator<EnumeratedRecordBatch> gen,
scanner->ScanBatchesUnorderedAsync());
ASSERT_FINISHES_OK(gen());
// The exact numbers may be imprecise due to threading but we should pretty quickly read
// up to our backpressure limit and a little above. We should not be able to go too far
// above.
BusyWait(30, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; });
ASSERT_GE(TotalBatchesRead(), kDefaultBackpressureHigh);
// Wait for the thread pool to idle. By this point the scanner should have paused
// itself This helps with timing on slower CI systems where there is only one core and
// the scanner might keep that core until it has scanned all the batches which never
// gives the sink a chance to report it is falling behind.
GetCpuThreadPool()->WaitForIdle();
DeliverAdditionalBatches();

SleepABit();
// Worst case we read in the entire set of initial batches
ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1);

Finish(std::move(gen));
}

struct BatchConsumer {
explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
: generator(std::move(generator)), next() {}
Expand Down
Loading