From 07f18cb229041688c06f1f211e6c6ad7fed65271 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Tue, 27 May 2025 12:31:31 +0100 Subject: [PATCH] fix --- cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 2 +- cpp/velox/tests/VeloxShuffleWriterTest.cc | 136 ++++++++++---------- 2 files changed, 72 insertions(+), 66 deletions(-) diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc index 3bf39683a7d6..83dc41fdb9b8 100644 --- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc @@ -160,7 +160,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr facebook::velox::row::CompactRow row(vector); if (fixedRowSize_.has_value()) { - rowSize_.resize(inputRows, fixedRowSize_.value() + sizeof(RowSizeType)); + rowSize_.resize(inputRows, fixedRowSize_.value()); } else { rowSize_.resize(inputRows); rowSizePrefixSum_.resize(inputRows + 1); diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 53ef82a02143..1024dafcfb57 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -290,7 +290,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam file_; }; -class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest { +class SinglePartitioningShuffleWriterTest : public VeloxShuffleWriterTest { protected: std::shared_ptr createShuffleWriter(uint32_t) override { auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool(); @@ -311,7 +311,7 @@ class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest { } }; -class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest { +class HashPartitioningShuffleWriterTest : public VeloxShuffleWriterTest { protected: void SetUp() override { VeloxShuffleWriterTest::SetUp(); @@ -351,7 +351,7 @@ class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest { facebook::velox::RowVectorPtr hashInputVector2_; }; -class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest { +class RangePartitioningShuffleWriterTest : public VeloxShuffleWriterTest { protected: void SetUp() override { VeloxShuffleWriterTest::SetUp(); @@ -394,7 +394,7 @@ class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest { std::shared_ptr compositeBatch2_; }; -class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest { +class RoundRobinPartitioningShuffleWriterTest : public VeloxShuffleWriterTest { protected: std::shared_ptr createShuffleWriter(uint32_t numPartitions) override { auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool(); @@ -420,7 +420,7 @@ class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest { } }; -TEST_P(SinglePartitioningShuffleWriter, single) { +TEST_P(SinglePartitioningShuffleWriterTest, single) { if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) { return; } @@ -476,57 +476,63 @@ TEST_P(SinglePartitioningShuffleWriter, single) { } } -TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) { +TEST_P(HashPartitioningShuffleWriterTest, hashPart1Vector) { ASSERT_NOT_OK(initShuffleWriterOptions()); - auto shuffleWriter = createShuffleWriter(2); - auto vector = makeRowVector({ - makeFlatVector({1, 2, 1, 2}), - makeNullableFlatVector({1, 2, 3, std::nullopt}), - makeFlatVector({1, 2, 3, 4}), - makeFlatVector({"nn", "re", "fr", "juiu"}), - makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(12, 4)), - makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(20, 4)), - makeFlatVector( - 4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()), - makeFlatVector( - 4, - [](vector_size_t row) { - return Timestamp{row % 2, 0}; - }, - nullEvery(5)), - }); - - auto rowType = facebook::velox::asRowType(vector->type()); - auto children = rowType->children(); - auto names = rowType->names(); - children.erase(children.begin()); - names.erase(names.begin()); - auto dataType = facebook::velox::ROW(std::move(names), std::move(children)); - - auto firstBlock = makeRowVector({ - makeNullableFlatVector({2, std::nullopt}), - makeFlatVector({2, 4}), - makeFlatVector({"re", "juiu"}), - makeFlatVector({34567235, 4567}, DECIMAL(12, 4)), - makeFlatVector({34567235, 4567}, DECIMAL(20, 4)), - makeFlatVector({1, 1}, DATE()), - makeFlatVector({Timestamp(1, 0), Timestamp(1, 0)}), - }); - - auto secondBlock = makeRowVector({ - makeNullableFlatVector({1, 3}), - makeFlatVector({1, 3}), - makeFlatVector({"nn", "fr"}), - makeFlatVector({232, 1212}, DECIMAL(12, 4)), - makeFlatVector({232, 1212}, DECIMAL(20, 4)), - makeNullableFlatVector({std::nullopt, 0}, DATE()), - makeNullableFlatVector({std::nullopt, Timestamp(0, 0)}), - }); - - testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {{firstBlock}, {secondBlock}}); + + // Fixed-length input. + { + auto shuffleWriter = createShuffleWriter(2); + + std::vector data = { + makeNullableFlatVector({1, 2, 3, std::nullopt}), + makeNullableFlatVector({1, 2, 3, 4}), + makeFlatVector({1, 2, 3, 4}), + makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(12, 4)), + makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(20, 4)), + makeFlatVector( + 4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()), + makeFlatVector( + 4, + [](vector_size_t row) { + return Timestamp{row % 2, 0}; + }, + nullEvery(5))}; + + const auto vector = makeRowVector(data); + + const auto blocksPid0 = takeRows({vector}, {{1, 3}}); + const auto blocksPid1 = takeRows({vector}, {{0, 2}}); + + // Add partition id as the first column. + data.insert(data.begin(), makeFlatVector({1, 2, 1, 2})); + const auto input = makeRowVector(data); + + testShuffleRoundTrip(*shuffleWriter, {input}, 2, {blocksPid0, blocksPid1}); + } + + // Variable-length input. + { + auto shuffleWriter = createShuffleWriter(2); + + std::vector data = { + makeFlatVector({"nn", "", "fr", "juiu"}), + makeNullableFlatVector({1, 2, 3, std::nullopt}), + makeNullableFlatVector({std::nullopt, "de", "10 I'm not inline string", "de"})}; + + const auto vector = makeRowVector(data); + + const auto blocksPid0 = takeRows({vector}, {{0, 1, 3}}); + const auto blocksPid1 = takeRows({vector}, {{2}}); + + // Add partition id as the first column. + data.insert(data.begin(), makeFlatVector({2, 2, 1, 2})); + const auto input = makeRowVector(data); + + testShuffleRoundTrip(*shuffleWriter, {input}, 2, {blocksPid0, blocksPid1}); + } } -TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) { +TEST_P(HashPartitioningShuffleWriterTest, hashPart1VectorComplexType) { ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(2); auto children = childrenComplex_; @@ -538,7 +544,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) { testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {firstBlock, secondBlock}); } -TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) { +TEST_P(HashPartitioningShuffleWriterTest, hashPart3Vectors) { ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(2); @@ -549,7 +555,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) { *shuffleWriter, {hashInputVector1_, hashInputVector2_, hashInputVector1_}, 2, {blockPid2, blockPid1}); } -TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) { +TEST_P(HashPartitioningShuffleWriterTest, hashLargeVectors) { const int32_t expectedMaxBatchSize = 8; ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(2); @@ -567,7 +573,7 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) { testShuffleRoundTrip(*shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2, {blockPid2, blockPid1}); } -TEST_P(RangePartitioningShuffleWriter, range) { +TEST_P(RangePartitioningShuffleWriterTest, range) { ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(2); @@ -578,7 +584,7 @@ TEST_P(RangePartitioningShuffleWriter, range) { *shuffleWriter, {compositeBatch1_, compositeBatch2_, compositeBatch1_}, 2, {blockPid1, blockPid2}); } -TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) { +TEST_P(RoundRobinPartitioningShuffleWriterTest, roundRobin) { ASSERT_NOT_OK(initShuffleWriterOptions()); auto shuffleWriter = createShuffleWriter(2); @@ -588,7 +594,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) { testShuffleRoundTrip(*shuffleWriter, {inputVector1_, inputVector2_, inputVector1_}, 2, {blockPid1, blockPid2}); } -TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) { +TEST_P(RoundRobinPartitioningShuffleWriterTest, preAllocForceRealloc) { if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) { return; } @@ -646,7 +652,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) { +TEST_P(RoundRobinPartitioningShuffleWriterTest, preAllocForceReuse) { if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) { return; } @@ -681,7 +687,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) { ASSERT_NOT_OK(shuffleWriter->stop()); } -TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) { +TEST_P(RoundRobinPartitioningShuffleWriterTest, spillVerifyResult) { if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) { return; } @@ -721,7 +727,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) { shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2}); } -TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) { +TEST_P(RoundRobinPartitioningShuffleWriterTest, sortMaxRows) { if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) { return; } @@ -738,22 +744,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) { INSTANTIATE_TEST_SUITE_P( SinglePartitioningShuffleWriterGroup, - SinglePartitioningShuffleWriter, + SinglePartitioningShuffleWriterTest, ::testing::ValuesIn(getTestParams())); INSTANTIATE_TEST_SUITE_P( RoundRobinPartitioningShuffleWriterGroup, - RoundRobinPartitioningShuffleWriter, + RoundRobinPartitioningShuffleWriterTest, ::testing::ValuesIn(getTestParams())); INSTANTIATE_TEST_SUITE_P( HashPartitioningShuffleWriterGroup, - HashPartitioningShuffleWriter, + HashPartitioningShuffleWriterTest, ::testing::ValuesIn(getTestParams())); INSTANTIATE_TEST_SUITE_P( RangePartitioningShuffleWriterGroup, - RangePartitioningShuffleWriter, + RangePartitioningShuffleWriterTest, ::testing::ValuesIn(getTestParams())); } // namespace gluten