Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this<AsyncSc
Result<TaggedRecordBatchGenerator> ScanBatchesAsync(Executor* executor);
Future<> VisitBatchesAsync(std::function<Status(TaggedRecordBatch)> visitor,
Executor* executor);
Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync(Executor* executor);
Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync(
Executor* executor, bool sequence_fragments = false);
Future<std::shared_ptr<Table>> ToTableAsync(Executor* executor);

Result<FragmentGenerator> GetFragments() const;
Expand Down Expand Up @@ -587,7 +588,7 @@ Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
}

Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
Executor* cpu_executor) {
Executor* cpu_executor, bool sequence_fragments) {
if (!scan_options_->use_threads) {
cpu_executor = nullptr;
}
Expand All @@ -608,7 +609,8 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
RETURN_NOT_OK(
compute::Declaration::Sequence(
{
{"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle}},
{"scan", ScanNodeOptions{dataset_, scan_options_, backpressure.toggle,
sequence_fragments}},
{"filter", compute::FilterNodeOptions{scan_options_->filter}},
{"augmented_project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
Expand Down Expand Up @@ -649,7 +651,8 @@ Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {

Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(cpu_executor));
ARROW_ASSIGN_OR_RAISE(auto unordered, ScanBatchesUnorderedAsync(
cpu_executor, /*sequence_fragments=*/true));
// We need an initial value sentinel, so we use one with fragment.index < 0
auto is_before_any = [](const EnumeratedRecordBatch& batch) {
return batch.fragment.index < 0;
Expand Down Expand Up @@ -1143,6 +1146,7 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
const auto& backpressure_toggle = scan_node_options.backpressure_toggle;
bool require_sequenced_output = scan_node_options.require_sequenced_output;

if (!scan_options->use_async) {
return Status::NotImplemented("ScanNodes without asynchrony");
Expand Down Expand Up @@ -1175,8 +1179,15 @@ Result<compute::ExecNode*> MakeScanNode(compute::ExecPlan* plan,
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(std::move(fragment_gen), scan_options));

auto merged_batch_gen =
MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead);
AsyncGenerator<EnumeratedRecordBatch> merged_batch_gen;
if (require_sequenced_output) {
ARROW_ASSIGN_OR_RAISE(merged_batch_gen,
MakeSequencedMergedGenerator(std::move(batch_gen_gen),
scan_options->fragment_readahead));
} else {
merged_batch_gen =
MakeMergedGenerator(std::move(batch_gen_gen), scan_options->fragment_readahead);
}

auto batch_gen = MakeReadaheadGenerator(std::move(merged_batch_gen),
scan_options->fragment_readahead);
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions {
public:
explicit ScanNodeOptions(
std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<util::AsyncToggle> backpressure_toggle = NULLPTR)
std::shared_ptr<util::AsyncToggle> backpressure_toggle = NULLPTR,
bool require_sequenced_output = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
backpressure_toggle(std::move(backpressure_toggle)) {}
backpressure_toggle(std::move(backpressure_toggle)),
require_sequenced_output(require_sequenced_output) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
std::shared_ptr<util::AsyncToggle> backpressure_toggle;
bool require_sequenced_output;
};

/// @}
Expand Down
24 changes: 21 additions & 3 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1027,12 +1027,12 @@ class TestBackpressure : public ::testing::Test {
return sum;
}

void Finish(AsyncGenerator<EnumeratedRecordBatch> gen) {
template <typename T>
void Finish(AsyncGenerator<T> gen) {
for (const auto& controlled_fragment : controlled_fragments_) {
controlled_fragment->Finish();
}
ASSERT_FINISHES_OK(VisitAsyncGenerator(
gen, [](EnumeratedRecordBatch batch) { return Status::OK(); }));
ASSERT_FINISHES_OK(VisitAsyncGenerator(gen, [](T batch) { return Status::OK(); }));
}

std::shared_ptr<Schema> schema_ = schema({field("values", int32())});
Expand Down Expand Up @@ -1063,6 +1063,24 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) {
Finish(std::move(gen));
}

TEST_F(TestBackpressure, ScanBatchesOrdered) {
std::shared_ptr<Scanner> scanner = MakeScanner();
EXPECT_OK_AND_ASSIGN(AsyncGenerator<TaggedRecordBatch> gen,
scanner->ScanBatchesAsync());
// This future never actually finishes because we only emit the first batch so far and
// the scanner delays by one batch. It is enough to start the system pumping though so
// we don't need it to finish.
Future<TaggedRecordBatch> fut = gen();

// See note on other test
GetCpuThreadPool()->WaitForIdle();
// Worst case we read in the entire set of initial batches
ASSERT_LE(TotalBatchesRead(), NBATCHES * (NFRAGMENTS - 1) + 1);

DeliverAdditionalBatches();
Finish(std::move(gen));
}

struct BatchConsumer {
explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
: generator(std::move(generator)), next() {}
Expand Down
84 changes: 65 additions & 19 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, Visitor visitor) {
return Loop(LoopBody{std::move(generator), std::move(visitor)});
}

/// \brief Waits for an async generator to complete, discarding results.
/// \brief Wait for an async generator to complete, discarding results.
template <typename T>
Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
return VisitAsyncGenerator(generator, visitor);
}

/// \brief Collects the results of an async generator into a vector
/// \brief Collect the results of an async generator into a vector
template <typename T>
Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
auto vec = std::make_shared<std::vector<T>>();
Expand Down Expand Up @@ -258,7 +258,7 @@ class MappingGenerator {
std::shared_ptr<State> state_;
};

/// \brief Creates a generator that will apply the map function to each element of
/// \brief Create a generator that will apply the map function to each element of
/// source. The map function is not called on the end token.
///
/// Note: This function makes a copy of `map` for each item
Expand All @@ -278,7 +278,7 @@ AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFn
return MappingGenerator<T, V>(std::move(source_generator), MapCallback{std::move(map)});
}

/// \brief Creates a generator that will apply the map function to
/// \brief Create a generator that will apply the map function to
/// each element of source. The map function is not called on the end
/// token. The result of the map function should be another
/// generator; all these generators will then be flattened to produce
Expand Down Expand Up @@ -417,7 +417,7 @@ class SequencingGenerator {
const std::shared_ptr<State> state_;
};

/// \brief Buffers an AsyncGenerator to return values in sequence order ComesAfter
/// \brief Buffer an AsyncGenerator to return values in sequence order ComesAfter
/// and IsNext determine the sequence order.
///
/// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
Expand Down Expand Up @@ -532,7 +532,7 @@ class TransformingGenerator {
std::shared_ptr<TransformingGeneratorState> state_;
};

/// \brief Transforms an async generator using a transformer function returning a new
/// \brief Transform an async generator using a transformer function returning a new
/// AsyncGenerator
///
/// The transform function here behaves exactly the same as the transform function in
Expand Down Expand Up @@ -686,7 +686,7 @@ class FutureFirstGenerator {
std::shared_ptr<State> state_;
};

/// \brief Transforms a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
/// \brief Transform a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
/// that waits for the future to complete as part of the first item.
///
/// This generator is not async-reentrant (even if the generator yielded by future is)
Expand All @@ -697,7 +697,7 @@ AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
return FutureFirstGenerator<T>(std::move(future));
}

/// \brief Creates a generator that will pull from the source into a queue. Unlike
/// \brief Create a generator that will pull from the source into a queue. Unlike
/// MakeReadaheadGenerator this will not pull reentrantly from the source.
///
/// The source generator does not need to be async-reentrant
Expand All @@ -711,6 +711,35 @@ AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generato
return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
}

/// \brief Create a generator that immediately pulls from the source
///
/// Typical generators do not pull from their source until they themselves
/// are pulled. This generator does not follow that convention and will call
/// generator() once before it returns. The returned generator will otherwise
/// mirror the source.
///
/// This generator forwards aysnc-reentrant pressure to the source
/// This generator buffers one item (the first result) until it is delivered.
template <typename T>
AsyncGenerator<T> MakeAutoStartingGenerator(AsyncGenerator<T> generator) {
struct AutostartGenerator {
Future<T> operator()() {
if (first_future->is_valid()) {
Future<T> result = *first_future;
*first_future = Future<T>();
return result;
}
return source();
}

std::shared_ptr<Future<T>> first_future;
AsyncGenerator<T> source;
};

std::shared_ptr<Future<T>> first_future = std::make_shared<Future<T>>(generator());
return AutostartGenerator{std::move(first_future), std::move(generator)};
}

/// \see MakeReadaheadGenerator
template <typename T>
class ReadaheadGenerator {
Expand Down Expand Up @@ -919,7 +948,7 @@ class PushGenerator {
const std::shared_ptr<State> state_;
};

/// \brief Creates a generator that pulls reentrantly from a source
/// \brief Create a generator that pulls reentrantly from a source
/// This generator will pull reentrantly from a source, ensuring that max_readahead
/// requests are active at any given time.
///
Expand Down Expand Up @@ -1137,7 +1166,7 @@ class MergedGenerator {
std::shared_ptr<State> state_;
};

/// \brief Creates a generator that takes in a stream of generators and pulls from up to
/// \brief Create a generator that takes in a stream of generators and pulls from up to
/// max_subscriptions at a time
///
/// Note: This may deliver items out of sequence. For example, items from the third
Expand All @@ -1156,7 +1185,24 @@ AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
return MergedGenerator<T>(std::move(source), max_subscriptions);
}

/// \brief Creates a generator that takes in a stream of generators and pulls from each
template <typename T>
Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
if (max_subscriptions < 0) {
return Status::Invalid("max_subscriptions must be a positive integer");
}
if (max_subscriptions == 1) {
return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
}
AsyncGenerator<AsyncGenerator<T>> autostarting_source = MakeMappedGenerator(
std::move(source),
[](const AsyncGenerator<T>& sub) { return MakeAutoStartingGenerator(sub); });
AsyncGenerator<AsyncGenerator<T>> sub_readahead =
MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1);
return MakeConcatenatedGenerator(std::move(sub_readahead));
}

/// \brief Create a generator that takes in a stream of generators and pulls from each
/// one in sequence.
///
/// This generator is async-reentrant but will never pull from source reentrantly and
Expand Down Expand Up @@ -1224,7 +1270,7 @@ class EnumeratingGenerator {
std::shared_ptr<State> state_;
};

/// Wraps items from a source generator with positional information
/// Wrap items from a source generator with positional information
///
/// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
/// processed in a "first-available" fashion and later resequenced which can reduce the
Expand Down Expand Up @@ -1260,7 +1306,7 @@ class TransferringGenerator {
internal::Executor* executor_;
};

/// \brief Transfers a future to an underlying executor.
/// \brief Transfer a future to an underlying executor.
///
/// Continuations run on the returned future will be run on the given executor
/// if they cannot be run synchronously.
Expand Down Expand Up @@ -1506,7 +1552,7 @@ class BackgroundGenerator {
constexpr int kDefaultBackgroundMaxQ = 32;
constexpr int kDefaultBackgroundQRestart = 16;

/// \brief Creates an AsyncGenerator<T> by iterating over an Iterator<T> on a background
/// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> on a background
/// thread
///
/// The parameter max_q and q_restart control queue size and background thread task
Expand Down Expand Up @@ -1554,14 +1600,14 @@ class GeneratorIterator {
AsyncGenerator<T> source_;
};

/// \brief Converts an AsyncGenerator<T> to an Iterator<T> by blocking until each future
/// \brief Convert an AsyncGenerator<T> to an Iterator<T> which blocks until each future
/// is finished
template <typename T>
Iterator<T> MakeGeneratorIterator(AsyncGenerator<T> source) {
return Iterator<T>(GeneratorIterator<T>(std::move(source)));
}

/// \brief Adds readahead to an iterator using a background thread.
/// \brief Add readahead to an iterator using a background thread.
///
/// Under the hood this is converting the iterator to a generator using
/// MakeBackgroundGenerator, adding readahead to the converted generator with
Expand Down Expand Up @@ -1633,7 +1679,7 @@ AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
return MakeFailingGenerator<T>(result.status());
}

/// \brief Prepends initial_values onto a generator
/// \brief Prepend initial_values onto a generator
///
/// This generator is async-reentrant but will buffer requests and will not
/// pull from following_values async-reentrantly.
Expand All @@ -1659,7 +1705,7 @@ struct CancellableGenerator {
StopToken stop_token;
};

/// \brief Allows an async generator to be cancelled
/// \brief Allow an async generator to be cancelled
///
/// This generator is async-reentrant
template <typename T>
Expand Down Expand Up @@ -1698,7 +1744,7 @@ struct PauseableGenerator {
std::shared_ptr<PauseableGeneratorState> state_;
};

/// \brief Allows an async generator to be paused
/// \brief Allow an async generator to be paused
///
/// This generator is NOT async-reentrant and calling it in an async-reentrant fashion
/// may lead to items getting reordered (and potentially truncated if the end token is
Expand Down
Loading