From b9a6c7f05de41eadac81046adea018282fa5907b Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Apr 2021 17:09:39 -1000 Subject: [PATCH 1/4] ARROW-12287: Adds an enumerating generator which tags items with their index as well as whether or not they were the last item in the sequence. This is needed for reassembly potentially out of order record batches during scan. --- cpp/src/arrow/util/async_generator.h | 80 ++++++++++++++++++++++ cpp/src/arrow/util/async_generator_test.cc | 47 +++++++++++++ 2 files changed, 127 insertions(+) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index db98243267b..85a147a75e0 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1063,6 +1063,86 @@ AsyncGenerator MakeConcatenatedGenerator(AsyncGenerator> so return MergedGenerator(std::move(source), 1); } +template +struct Enumerated { + util::optional value; + int index; + bool last; +}; + +template +struct IterationTraits> { + static Enumerated End() { return Enumerated{{}, -1, false}; } + static bool IsEnd(const Enumerated& val) { return !val.value.has_value(); } +}; + +/// \see MakeEnumeratedGenerator +template +class EnumeratingGenerator { + public: + EnumeratingGenerator(AsyncGenerator source, T initial_value) + : state_(std::make_shared(std::move(source), std::move(initial_value))) {} + + Future> operator()() { + if (state_->finished) { + return AsyncGeneratorEnd>(); + } else { + auto state = state_; + return state->source().Then([state](const T& next) { + auto finished = IsIterationEnd(next); + auto prev = Enumerated{state->prev_value, state->prev_index, finished}; + state->prev_value = next; + state->prev_index++; + state->finished = finished; + return prev; + }); + } + } + + private: + struct State { + State(AsyncGenerator source, T initial_value) + : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) { + finished = IsIterationEnd(prev_value); + } + + AsyncGenerator source; + T prev_value; + int prev_index; + bool finished; + }; + + std::shared_ptr state_; +}; + +/// Wraps items from a source generator with positional information +/// +/// When reqsequencing items from multiple streams that have been merged into +/// one it helps to know when an item is the last item in the stream. +/// +/// Note: Another potential use for this could be resequencing items from a +/// jittery source. However, the readahead generator will not emit items out of +/// order today so this is not needed. Furthermore, this generator would need to +/// support async reentrancy which, while possible, is not done currently. +/// +/// Note: Since this generator is not actually taking in out-of-order sources it isn't +/// strictly neccesary to add the index, it could be added by a map generator. However, +/// since this generator is usually used as laster input to the sequencing generator and +/// the sequencing generator needs the index we go ahead and add it for utility's sake +/// +/// \see MakeSequencingGenerator for an example of putting items back in order +/// +/// This generator is not async-reentrant +/// +/// This generator buffers one item (so it knows which item is the last item) +template +AsyncGenerator> MakeEnumeratedGenerator(AsyncGenerator source) { + return FutureFirstGenerator>( + source().Then([source](const T& initial_value) -> AsyncGenerator> { + return EnumeratingGenerator(std::move(source), initial_value); + })); +} + /// \see MakeTransferredGenerator template class TransferringGenerator { diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 518422de586..3467d121720 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -229,6 +229,8 @@ class GeneratorTestFixture : public ::testing::TestWithParam { return gen; } + AsyncGenerator MakeEmptySource() { return MakeSource({}); } + AsyncGenerator MakeFailingSource() { AsyncGenerator gen = [] { return Future::MakeFinished(Status::Invalid("XYZ")); @@ -1017,6 +1019,51 @@ TEST(TestAsyncUtil, ReadaheadFailed) { ASSERT_TRUE(IsIterationEnd(definitely_last)); } +class EnumeratorTestFixture : public GeneratorTestFixture { + protected: + void AssertEnumeratedCorrectly(AsyncGenerator>& gen, + int num_items) { + auto collected = CollectAsyncGenerator(gen); + ASSERT_FINISHES_OK_AND_ASSIGN(auto items, collected); + EXPECT_EQ(num_items, items.size()); + + for (const auto& item : items) { + ASSERT_TRUE(item.value.has_value()); + ASSERT_EQ(item.index, (*item.value).value); + bool last = item.index == num_items - 1; + ASSERT_EQ(last, item.last); + } + AssertGeneratorExhausted(gen); + } +}; + +TEST_P(EnumeratorTestFixture, Basic) { + constexpr int NITEMS = 100; + + auto source = MakeSource(RangeVector(NITEMS)); + auto enumerated = MakeEnumeratedGenerator(std::move(source)); + + AssertEnumeratedCorrectly(enumerated, NITEMS); +} + +TEST_P(EnumeratorTestFixture, Empty) { + auto source = MakeEmptySource(); + auto enumerated = MakeEnumeratedGenerator(std::move(source)); + AssertGeneratorExhausted(enumerated); +} + +TEST_P(EnumeratorTestFixture, Error) { + auto source = FailsAt(MakeSource({1, 2, 3}), 1); + auto enumerated = MakeEnumeratedGenerator(std::move(source)); + + // Even though the first item finishes ok the enumerator buffers it. The error then + // takes priority over the buffered result. + ASSERT_FINISHES_AND_RAISES(Invalid, enumerated()); +} + +INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratorTestFixture, + ::testing::Values(false, true)); + class SequencerTestFixture : public GeneratorTestFixture { protected: void RandomShuffle(std::vector& values) { From 280fd496c7942e14a687439715a0e0a3ab2629fc Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Apr 2021 23:07:14 -1000 Subject: [PATCH 2/4] ARROW-12287: From PR: typo and switch to using T instead of util::optional --- cpp/src/arrow/util/async_generator.h | 8 ++++---- cpp/src/arrow/util/async_generator_test.cc | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 85a147a75e0..8d46791f2a4 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1065,15 +1065,15 @@ AsyncGenerator MakeConcatenatedGenerator(AsyncGenerator> so template struct Enumerated { - util::optional value; + T value; int index; bool last; }; template struct IterationTraits> { - static Enumerated End() { return Enumerated{{}, -1, false}; } - static bool IsEnd(const Enumerated& val) { return !val.value.has_value(); } + static Enumerated End() { return Enumerated{IterationEnd(), -1, false}; } + static bool IsEnd(const Enumerated& val) { return val.index < 0; } }; /// \see MakeEnumeratedGenerator @@ -1127,7 +1127,7 @@ class EnumeratingGenerator { /// /// Note: Since this generator is not actually taking in out-of-order sources it isn't /// strictly neccesary to add the index, it could be added by a map generator. However, -/// since this generator is usually used as laster input to the sequencing generator and +/// since this generator is usually used as later input to the sequencing generator and /// the sequencing generator needs the index we go ahead and add it for utility's sake /// /// \see MakeSequencingGenerator for an example of putting items back in order diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 3467d121720..62ba35e2f7e 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -1028,8 +1028,7 @@ class EnumeratorTestFixture : public GeneratorTestFixture { EXPECT_EQ(num_items, items.size()); for (const auto& item : items) { - ASSERT_TRUE(item.value.has_value()); - ASSERT_EQ(item.index, (*item.value).value); + ASSERT_EQ(item.index, item.value.value); bool last = item.index == num_items - 1; ASSERT_EQ(last, item.last); } From 0cb8672bd5d10c16f0a8829a2d9afbc01f34707f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 13 Apr 2021 11:04:21 -1000 Subject: [PATCH 3/4] ARROW-12287: Cleaned up the description of enumerating generator based on PR comments --- cpp/src/arrow/util/async_generator.h | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 8d46791f2a4..dcc0d4025a7 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1117,18 +1117,12 @@ class EnumeratingGenerator { /// Wraps items from a source generator with positional information /// -/// When reqsequencing items from multiple streams that have been merged into -/// one it helps to know when an item is the last item in the stream. +/// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be +/// processed in a "first-available" fashion and later reqsequenced which can reduce the +/// impact of sources with erratic performance (e.g. a filesystem where some items may +/// take longer to read than others). /// -/// Note: Another potential use for this could be resequencing items from a -/// jittery source. However, the readahead generator will not emit items out of -/// order today so this is not needed. Furthermore, this generator would need to -/// support async reentrancy which, while possible, is not done currently. -/// -/// Note: Since this generator is not actually taking in out-of-order sources it isn't -/// strictly neccesary to add the index, it could be added by a map generator. However, -/// since this generator is usually used as later input to the sequencing generator and -/// the sequencing generator needs the index we go ahead and add it for utility's sake +/// TODO(ARROW-12371) Would require this generator be async-reentrant /// /// \see MakeSequencingGenerator for an example of putting items back in order /// From 94fae0a138d2d66b13861f1370e08c59cfa0b847 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 14 Apr 2021 08:29:13 -0400 Subject: [PATCH 4/4] Update cpp/src/arrow/util/async_generator.h --- cpp/src/arrow/util/async_generator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index dcc0d4025a7..a95efb2365b 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1118,7 +1118,7 @@ class EnumeratingGenerator { /// Wraps items from a source generator with positional information /// /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be -/// processed in a "first-available" fashion and later reqsequenced which can reduce the +/// processed in a "first-available" fashion and later resequenced which can reduce the /// impact of sources with erratic performance (e.g. a filesystem where some items may /// take longer to read than others). ///