Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cpp/src/arrow/testing/async_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,37 @@
#include <atomic>
#include <memory>

#include "arrow/testing/gtest_util.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"

namespace arrow {
namespace util {

template <typename T>
AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
return MakeVectorGenerator(std::move(v));
}

template <typename T>
AsyncGenerator<T> FailAt(AsyncGenerator<T> src, int failing_index) {
auto index = std::make_shared<std::atomic<int>>(0);
return [src, index, failing_index]() {
auto idx = index->fetch_add(1);
if (idx >= failing_index) {
return Future<T>::MakeFinished(Status::Invalid("XYZ"));
}
return src();
};
}

template <typename T>
AsyncGenerator<T> SlowdownABit(AsyncGenerator<T> source) {
return MakeMappedGenerator(std::move(source), [](const T& res) {
return SleepABitAsync().Then([res]() { return res; });
});
}

template <typename T>
class TrackingGenerator {
public:
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ add_arrow_test(threading-utility-test
counting_semaphore_test.cc
future_test.cc
task_group_test.cc
test_common.cc
thread_pool_test.cc)

add_arrow_benchmark(bit_block_counter_benchmark)
Expand Down
95 changes: 36 additions & 59 deletions cpp/src/arrow/util/async_generator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,6 @@

namespace arrow {

template <typename T>
AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
return MakeVectorGenerator(std::move(v));
}

template <typename T>
AsyncGenerator<T> FailsAt(AsyncGenerator<T> src, int failing_index) {
auto index = std::make_shared<std::atomic<int>>(0);
return [src, index, failing_index]() {
auto idx = index->fetch_add(1);
if (idx >= failing_index) {
return Future<T>::MakeFinished(Status::Invalid("XYZ"));
}
return src();
};
}

template <typename T>
AsyncGenerator<T> SlowdownABit(AsyncGenerator<T> source) {
return MakeMappedGenerator(std::move(source), [](const T& res) {
return SleepABitAsync().Then([res]() { return res; });
});
}

template <typename T>
AsyncGenerator<T> MakeJittery(AsyncGenerator<T> source) {
auto latency_generator = arrow::io::LatencyGenerator::Make(0.01);
Expand Down Expand Up @@ -217,9 +193,9 @@ class GeneratorTestFixture : public ::testing::TestWithParam<bool> {
protected:
AsyncGenerator<TestInt> MakeSource(const std::vector<TestInt>& items) {
std::vector<TestInt> wrapped(items.begin(), items.end());
auto gen = AsyncVectorIt(std::move(wrapped));
auto gen = util::AsyncVectorIt(std::move(wrapped));
if (IsSlow()) {
return SlowdownABit(std::move(gen));
return util::SlowdownABit(std::move(gen));
}
return gen;
}
Expand All @@ -231,7 +207,7 @@ class GeneratorTestFixture : public ::testing::TestWithParam<bool> {
return Future<TestInt>::MakeFinished(Status::Invalid("XYZ"));
};
if (IsSlow()) {
return SlowdownABit(std::move(gen));
return util::SlowdownABit(std::move(gen));
}
return gen;
}
Expand Down Expand Up @@ -324,7 +300,7 @@ class ManualGenerator {
};

TEST(TestAsyncUtil, Visit) {
auto generator = AsyncVectorIt<TestInt>({1, 2, 3});
auto generator = util::AsyncVectorIt<TestInt>({1, 2, 3});
unsigned int sum = 0;
auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
sum += item.value;
Expand All @@ -336,15 +312,15 @@ TEST(TestAsyncUtil, Visit) {

TEST(TestAsyncUtil, Collect) {
std::vector<TestInt> expected = {1, 2, 3};
auto generator = AsyncVectorIt(expected);
auto generator = util::AsyncVectorIt(expected);
auto collected = CollectAsyncGenerator(generator);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected_val, collected);
ASSERT_EQ(expected, collected_val);
}

TEST(TestAsyncUtil, Map) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
auto generator = util::AsyncVectorIt(input);
std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) {
return std::to_string(in.value);
};
Expand All @@ -355,7 +331,7 @@ TEST(TestAsyncUtil, Map) {

TEST(TestAsyncUtil, MapAsync) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
auto generator = util::AsyncVectorIt(input);
std::function<Future<TestStr>(const TestInt&)> mapper = [](const TestInt& in) {
return SleepAsync(1e-3).Then([in]() { return TestStr(std::to_string(in.value)); });
};
Expand All @@ -366,7 +342,7 @@ TEST(TestAsyncUtil, MapAsync) {

TEST(TestAsyncUtil, MapReentrant) {
std::vector<TestInt> input = {1, 2};
auto source = AsyncVectorIt(input);
auto source = util::AsyncVectorIt(input);
util::TrackingGenerator<TestInt> tracker(std::move(source));
source = MakeTransferredGenerator(AsyncGenerator<TestInt>(tracker),
internal::GetCpuThreadPool());
Expand Down Expand Up @@ -408,7 +384,7 @@ TEST(TestAsyncUtil, MapParallelStress) {
constexpr int NITEMS = 10;
for (int i = 0; i < NTASKS; i++) {
auto gen = MakeVectorGenerator(RangeVector(NITEMS));
gen = SlowdownABit(std::move(gen));
gen = util::SlowdownABit(std::move(gen));
auto guard = ExpectNotAccessedReentrantly(&gen);
std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) {
SleepABit();
Expand All @@ -427,9 +403,9 @@ TEST(TestAsyncUtil, MapQueuingFailStress) {
for (bool slow : {true, false}) {
for (int i = 0; i < NTASKS; i++) {
std::shared_ptr<std::atomic<bool>> done = std::make_shared<std::atomic<bool>>();
auto inner = AsyncVectorIt(RangeVector(NITEMS));
auto inner = util::AsyncVectorIt(RangeVector(NITEMS));
if (slow) inner = MakeJittery(inner);
auto gen = FailsAt(inner, NITEMS / 2);
auto gen = util::FailAt(inner, NITEMS / 2);
std::function<TestStr(const TestInt&)> mapper = [done](const TestInt& in) {
if (done->load()) {
ADD_FAILURE() << "Callback called after generator sent end signal";
Expand All @@ -446,7 +422,7 @@ TEST(TestAsyncUtil, MapQueuingFailStress) {

TEST(TestAsyncUtil, MapTaskFail) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = AsyncVectorIt(input);
auto generator = util::AsyncVectorIt(input);
std::function<Result<TestStr>(const TestInt&)> mapper =
[](const TestInt& in) -> Result<TestStr> {
if (in.value == 2) {
Expand Down Expand Up @@ -492,7 +468,7 @@ TEST(TestAsyncUtil, MapTaskDelayedFail) {

TEST(TestAsyncUtil, MapSourceFail) {
std::vector<TestInt> input = {1, 2, 3};
auto generator = FailsAt(AsyncVectorIt(input), 1);
auto generator = util::FailAt(util::AsyncVectorIt(input), 1);
std::function<Result<TestStr>(const TestInt&)> mapper =
[](const TestInt& in) -> Result<TestStr> {
return TestStr(std::to_string(in.value));
Expand All @@ -505,8 +481,8 @@ TEST(TestAsyncUtil, Concatenated) {
std::vector<TestInt> inputOne{1, 2, 3};
std::vector<TestInt> inputTwo{4, 5, 6};
std::vector<TestInt> expected{1, 2, 3, 4, 5, 6};
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{AsyncVectorIt<TestInt>(inputOne), AsyncVectorIt<TestInt>(inputTwo)});
auto gen = util::AsyncVectorIt<AsyncGenerator<TestInt>>(
{util::AsyncVectorIt<TestInt>(inputOne), util::AsyncVectorIt<TestInt>(inputTwo)});
auto concat = MakeConcatenatedGenerator(gen);
AssertAsyncGeneratorMatch(expected, concat);
}
Expand All @@ -523,7 +499,7 @@ TEST_P(FromFutureFixture, Basic) {
auto to_gen = source.Then([slow](const std::vector<TestInt>& vec) {
auto vec_gen = MakeVectorGenerator(vec);
if (slow) {
return SlowdownABit(std::move(vec_gen));
return util::SlowdownABit(std::move(vec_gen));
}
return vec_gen;
});
Expand All @@ -538,7 +514,7 @@ INSTANTIATE_TEST_SUITE_P(FromFutureTests, FromFutureFixture,
class MergedGeneratorTestFixture : public GeneratorTestFixture {};

TEST_P(MergedGeneratorTestFixture, Merged) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
auto gen = util::AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeSource({4, 5, 6})});

auto concat_gen = MakeMergedGenerator(gen, 10);
Expand All @@ -552,9 +528,9 @@ TEST_P(MergedGeneratorTestFixture, Merged) {
}

TEST_P(MergedGeneratorTestFixture, OuterSubscriptionEmpty) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>({});
auto gen = util::AsyncVectorIt<AsyncGenerator<TestInt>>({});
if (IsSlow()) {
gen = SlowdownABit(gen);
gen = util::SlowdownABit(gen);
}
auto merged_gen = MakeMergedGenerator(gen, 10);
ASSERT_FINISHES_OK_AND_ASSIGN(auto collected,
Expand All @@ -563,8 +539,9 @@ TEST_P(MergedGeneratorTestFixture, OuterSubscriptionEmpty) {
}

TEST_P(MergedGeneratorTestFixture, MergedInnerFail) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), FailsAt(MakeSource({1, 2, 3}), 1), MakeSource({1, 2, 3})});
auto gen = util::AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), util::FailAt(MakeSource({1, 2, 3}), 1),
MakeSource({1, 2, 3})});
auto merged_gen = MakeMergedGenerator(gen, 10);
// Merged generator can be pulled async-reentrantly and we need to make
// sure, if it is, that all futures are marked complete, even if there is an error
Expand Down Expand Up @@ -672,23 +649,23 @@ TEST_P(MergedGeneratorTestFixture, MergedInnerFailCleanup) {

TEST_P(MergedGeneratorTestFixture, FinishesQuickly) {
// Testing a source that finishes on the first pull
auto source = AsyncVectorIt<AsyncGenerator<TestInt>>({MakeSource({1})});
auto source = util::AsyncVectorIt<AsyncGenerator<TestInt>>({MakeSource({1})});
auto merged = MakeMergedGenerator(std::move(source), 10);
ASSERT_FINISHES_OK_AND_EQ(TestInt(1), merged());
AssertGeneratorExhausted(merged);
}

TEST_P(MergedGeneratorTestFixture, MergedOuterFail) {
auto gen =
FailsAt(AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeSource({1, 2, 3}), MakeSource({1, 2, 3})}),
1);
auto gen = util::FailAt(
util::AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2, 3}), MakeSource({1, 2, 3}), MakeSource({1, 2, 3})}),
1);
auto merged_gen = MakeMergedGenerator(gen, 10);
ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(merged_gen));
}

TEST_P(MergedGeneratorTestFixture, MergedLimitedSubscriptions) {
auto gen = AsyncVectorIt<AsyncGenerator<TestInt>>(
auto gen = util::AsyncVectorIt<AsyncGenerator<TestInt>>(
{MakeSource({1, 2}), MakeSource({3, 4}), MakeSource({5, 6, 7, 8}),
MakeSource({9, 10, 11, 12})});
util::TrackingGenerator<AsyncGenerator<TestInt>> tracker(std::move(gen));
Expand Down Expand Up @@ -739,7 +716,7 @@ TEST_P(MergedGeneratorTestFixture, MergedStress) {
guards.push_back(ExpectNotAccessedReentrantly(&source));
sources.push_back(source);
}
AsyncGenerator<AsyncGenerator<TestInt>> source_gen = AsyncVectorIt(sources);
AsyncGenerator<AsyncGenerator<TestInt>> source_gen = util::AsyncVectorIt(sources);
auto outer_gaurd = ExpectNotAccessedReentrantly(&source_gen);

auto merged = MakeMergedGenerator(source_gen, 4);
Expand All @@ -756,7 +733,7 @@ TEST_P(MergedGeneratorTestFixture, MergedParallelStress) {
for (int j = 0; j < NGENERATORS; j++) {
sources.push_back(MakeSource(RangeVector(NITEMS)));
}
auto merged = MakeMergedGenerator(AsyncVectorIt(sources), 4);
auto merged = MakeMergedGenerator(util::AsyncVectorIt(sources), 4);
merged = MakeReadaheadGenerator(merged, 4);
ASSERT_FINISHES_OK_AND_ASSIGN(auto items, CollectAsyncGenerator(merged));
ASSERT_EQ(NITEMS * NGENERATORS, items.size());
Expand Down Expand Up @@ -1448,7 +1425,7 @@ TEST(TestAsyncUtil, ReadaheadOneItem) {
}

TEST(TestAsyncUtil, ReadaheadCopy) {
auto source = AsyncVectorIt<TestInt>(RangeVector(6));
auto source = util::AsyncVectorIt<TestInt>(RangeVector(6));
auto gen = MakeReadaheadGenerator(std::move(source), 2);

for (int i = 0; i < 2; i++) {
Expand All @@ -1466,7 +1443,7 @@ TEST(TestAsyncUtil, ReadaheadCopy) {
}

TEST(TestAsyncUtil, ReadaheadMove) {
auto source = AsyncVectorIt<TestInt>(RangeVector(6));
auto source = util::AsyncVectorIt<TestInt>(RangeVector(6));
auto gen = MakeReadaheadGenerator(std::move(source), 2);

for (int i = 0; i < 2; i++) {
Expand Down Expand Up @@ -1600,7 +1577,7 @@ TEST_P(EnumeratorTestFixture, Empty) {
}

TEST_P(EnumeratorTestFixture, Error) {
auto source = FailsAt(MakeSource({1, 2, 3}), 1);
auto source = util::FailAt(MakeSource({1, 2, 3}), 1);
auto enumerated = MakeEnumeratedGenerator(std::move(source));

// Even though the first item finishes ok the enumerator buffers it. The error then
Expand Down Expand Up @@ -1666,7 +1643,7 @@ class PauseableTestFixture : public GeneratorTestFixture {
AsyncGenerator<TestInt> GetSource() {
const auto& source = static_cast<AsyncGenerator<TestInt>>(generator_);
if (IsSlow()) {
return SlowdownABit(source);
return util::SlowdownABit(source);
} else {
return source;
}
Expand Down Expand Up @@ -1741,7 +1718,7 @@ TEST_P(SequencerTestFixture, SequenceLambda) {
TEST_P(SequencerTestFixture, SequenceError) {
{
auto original = MakeSource({6, 4, 2});
original = FailsAt(original, 1);
original = util::FailAt(original, 1);
auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
auto collected = CollectAsyncGenerator(sequenced);
ASSERT_FINISHES_AND_RAISES(Invalid, collected);
Expand Down Expand Up @@ -1824,7 +1801,7 @@ INSTANTIATE_TEST_SUITE_P(SequencerTests, SequencerTestFixture,
::testing::Values(false, true));

TEST(TestAsyncIteratorTransform, SkipSome) {
auto original = AsyncVectorIt<TestInt>({1, 2, 3});
auto original = util::AsyncVectorIt<TestInt>({1, 2, 3});
auto filter = MakeFilter([](TestInt& t) { return t.value != 2; });
auto filtered = MakeTransformedGenerator(std::move(original), filter);
AssertAsyncGeneratorMatch({"1", "3"}, std::move(filtered));
Expand Down
Loading