Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,80 @@ AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
return MergedGenerator<T>(std::move(source), 1);
}

template <typename T>
struct Enumerated {
T value;
int index;
bool last;
};

template <typename T>
struct IterationTraits<Enumerated<T>> {
static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
};

/// \see MakeEnumeratedGenerator
template <typename T>
class EnumeratingGenerator {
public:
EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
: state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}

Future<Enumerated<T>> operator()() {
if (state_->finished) {
return AsyncGeneratorEnd<Enumerated<T>>();
} else {
auto state = state_;
return state->source().Then([state](const T& next) {
auto finished = IsIterationEnd<T>(next);
auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
state->prev_value = next;
state->prev_index++;
state->finished = finished;
return prev;
});
}
}

private:
struct State {
State(AsyncGenerator<T> source, T initial_value)
: source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
finished = IsIterationEnd<T>(prev_value);
}

AsyncGenerator<T> source;
T prev_value;
int prev_index;
bool finished;
};

std::shared_ptr<State> state_;
};

/// Wraps items from a source generator with positional information
///
/// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
/// processed in a "first-available" fashion and later resequenced which can reduce the
/// impact of sources with erratic performance (e.g. a filesystem where some items may
/// take longer to read than others).
///
/// TODO(ARROW-12371) Would require this generator be async-reentrant
///
/// \see MakeSequencingGenerator for an example of putting items back in order
///
/// This generator is not async-reentrant
///
/// This generator buffers one item (so it knows which item is the last item)
template <typename T>
AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
return FutureFirstGenerator<Enumerated<T>>(
source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
return EnumeratingGenerator<T>(std::move(source), initial_value);
}));
}

/// \see MakeTransferredGenerator
template <typename T>
class TransferringGenerator {
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/arrow/util/async_generator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class GeneratorTestFixture : public ::testing::TestWithParam<bool> {
return gen;
}

AsyncGenerator<TestInt> MakeEmptySource() { return MakeSource({}); }

AsyncGenerator<TestInt> MakeFailingSource() {
AsyncGenerator<TestInt> gen = [] {
return Future<TestInt>::MakeFinished(Status::Invalid("XYZ"));
Expand Down Expand Up @@ -1017,6 +1019,50 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
ASSERT_TRUE(IsIterationEnd(definitely_last));
}

class EnumeratorTestFixture : public GeneratorTestFixture {
protected:
void AssertEnumeratedCorrectly(AsyncGenerator<Enumerated<TestInt>>& gen,
int num_items) {
auto collected = CollectAsyncGenerator(gen);
ASSERT_FINISHES_OK_AND_ASSIGN(auto items, collected);
EXPECT_EQ(num_items, items.size());

for (const auto& item : items) {
ASSERT_EQ(item.index, item.value.value);
bool last = item.index == num_items - 1;
ASSERT_EQ(last, item.last);
}
AssertGeneratorExhausted(gen);
}
};

TEST_P(EnumeratorTestFixture, Basic) {
constexpr int NITEMS = 100;

auto source = MakeSource(RangeVector(NITEMS));
auto enumerated = MakeEnumeratedGenerator(std::move(source));

AssertEnumeratedCorrectly(enumerated, NITEMS);
}

TEST_P(EnumeratorTestFixture, Empty) {
auto source = MakeEmptySource();
auto enumerated = MakeEnumeratedGenerator(std::move(source));
AssertGeneratorExhausted(enumerated);
}

TEST_P(EnumeratorTestFixture, Error) {
auto source = FailsAt(MakeSource({1, 2, 3}), 1);
auto enumerated = MakeEnumeratedGenerator(std::move(source));

// Even though the first item finishes ok the enumerator buffers it. The error then
// takes priority over the buffered result.
ASSERT_FINISHES_AND_RAISES(Invalid, enumerated());
}

INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture,
::testing::Values(false, true));

class SequencerTestFixture : public GeneratorTestFixture {
protected:
void RandomShuffle(std::vector<TestInt>& values) {
Expand Down