Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
facebook::velox::row::CompactRow row(vector);

if (fixedRowSize_.has_value()) {
rowSize_.resize(inputRows, fixedRowSize_.value() + sizeof(RowSizeType));
rowSize_.resize(inputRows, fixedRowSize_.value());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the real issue is due to rowSize is resized 4 more rows?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowSize_ was wrong for fixed-widths input. It was over calculated by 4 bytes.

} else {
rowSize_.resize(inputRows);
rowSizePrefixSum_.resize(inputRows + 1);
Expand Down
136 changes: 71 additions & 65 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams
std::shared_ptr<arrow::io::ReadableFile> file_;
};

class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
class SinglePartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t) override {
auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
Expand All @@ -311,7 +311,7 @@ class SinglePartitioningShuffleWriter : public VeloxShuffleWriterTest {
}
};

class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest {
class HashPartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
void SetUp() override {
VeloxShuffleWriterTest::SetUp();
Expand Down Expand Up @@ -351,7 +351,7 @@ class HashPartitioningShuffleWriter : public VeloxShuffleWriterTest {
facebook::velox::RowVectorPtr hashInputVector2_;
};

class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest {
class RangePartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
void SetUp() override {
VeloxShuffleWriterTest::SetUp();
Expand Down Expand Up @@ -394,7 +394,7 @@ class RangePartitioningShuffleWriter : public VeloxShuffleWriterTest {
std::shared_ptr<ColumnarBatch> compositeBatch2_;
};

class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest {
class RoundRobinPartitioningShuffleWriterTest : public VeloxShuffleWriterTest {
protected:
std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(uint32_t numPartitions) override {
auto* arrowPool = getDefaultMemoryManager()->getArrowMemoryPool();
Expand All @@ -420,7 +420,7 @@ class RoundRobinPartitioningShuffleWriter : public VeloxShuffleWriterTest {
}
};

TEST_P(SinglePartitioningShuffleWriter, single) {
TEST_P(SinglePartitioningShuffleWriterTest, single) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
Expand Down Expand Up @@ -476,57 +476,63 @@ TEST_P(SinglePartitioningShuffleWriter, single) {
}
}

TEST_P(HashPartitioningShuffleWriter, hashPart1Vector) {
TEST_P(HashPartitioningShuffleWriterTest, hashPart1Vector) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);
auto vector = makeRowVector({
makeFlatVector<int32_t>({1, 2, 1, 2}),
makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt}),
makeFlatVector<int64_t>({1, 2, 3, 4}),
makeFlatVector<StringView>({"nn", "re", "fr", "juiu"}),
makeFlatVector<int64_t>({232, 34567235, 1212, 4567}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({232, 34567235, 1212, 4567}, DECIMAL(20, 4)),
makeFlatVector<int32_t>(
4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()),
makeFlatVector<Timestamp>(
4,
[](vector_size_t row) {
return Timestamp{row % 2, 0};
},
nullEvery(5)),
});

auto rowType = facebook::velox::asRowType(vector->type());
auto children = rowType->children();
auto names = rowType->names();
children.erase(children.begin());
names.erase(names.begin());
auto dataType = facebook::velox::ROW(std::move(names), std::move(children));

auto firstBlock = makeRowVector({
makeNullableFlatVector<int8_t>({2, std::nullopt}),
makeFlatVector<int64_t>({2, 4}),
makeFlatVector<StringView>({"re", "juiu"}),
makeFlatVector<int64_t>({34567235, 4567}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({34567235, 4567}, DECIMAL(20, 4)),
makeFlatVector<int32_t>({1, 1}, DATE()),
makeFlatVector<Timestamp>({Timestamp(1, 0), Timestamp(1, 0)}),
});

auto secondBlock = makeRowVector({
makeNullableFlatVector<int8_t>({1, 3}),
makeFlatVector<int64_t>({1, 3}),
makeFlatVector<StringView>({"nn", "fr"}),
makeFlatVector<int64_t>({232, 1212}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({232, 1212}, DECIMAL(20, 4)),
makeNullableFlatVector<int32_t>({std::nullopt, 0}, DATE()),
makeNullableFlatVector<Timestamp>({std::nullopt, Timestamp(0, 0)}),
});

testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {{firstBlock}, {secondBlock}});

// Fixed-length input.
{
auto shuffleWriter = createShuffleWriter(2);

std::vector<VectorPtr> data = {
makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt}),
makeNullableFlatVector<int16_t>({1, 2, 3, 4}),
makeFlatVector<int64_t>({1, 2, 3, 4}),
makeFlatVector<int64_t>({232, 34567235, 1212, 4567}, DECIMAL(12, 4)),
makeFlatVector<int128_t>({232, 34567235, 1212, 4567}, DECIMAL(20, 4)),
makeFlatVector<int32_t>(
4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()),
makeFlatVector<Timestamp>(
4,
[](vector_size_t row) {
return Timestamp{row % 2, 0};
},
nullEvery(5))};

const auto vector = makeRowVector(data);

const auto blocksPid0 = takeRows({vector}, {{1, 3}});
const auto blocksPid1 = takeRows({vector}, {{0, 2}});

// Add partition id as the first column.
data.insert(data.begin(), makeFlatVector<int32_t>({1, 2, 1, 2}));
const auto input = makeRowVector(data);

testShuffleRoundTrip(*shuffleWriter, {input}, 2, {blocksPid0, blocksPid1});
}

// Variable-length input.
{
auto shuffleWriter = createShuffleWriter(2);

std::vector<VectorPtr> data = {
makeFlatVector<StringView>({"nn", "", "fr", "juiu"}),
makeNullableFlatVector<int8_t>({1, 2, 3, std::nullopt}),
makeNullableFlatVector<StringView>({std::nullopt, "de", "10 I'm not inline string", "de"})};

const auto vector = makeRowVector(data);

const auto blocksPid0 = takeRows({vector}, {{0, 1, 3}});
const auto blocksPid1 = takeRows({vector}, {{2}});

// Add partition id as the first column.
data.insert(data.begin(), makeFlatVector<int32_t>({2, 2, 1, 2}));
const auto input = makeRowVector(data);

testShuffleRoundTrip(*shuffleWriter, {input}, 2, {blocksPid0, blocksPid1});
}
}

TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
TEST_P(HashPartitioningShuffleWriterTest, hashPart1VectorComplexType) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);
auto children = childrenComplex_;
Expand All @@ -538,7 +544,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart1VectorComplexType) {
testShuffleRoundTrip(*shuffleWriter, {vector}, 2, {firstBlock, secondBlock});
}

TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
TEST_P(HashPartitioningShuffleWriterTest, hashPart3Vectors) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);

Expand All @@ -549,7 +555,7 @@ TEST_P(HashPartitioningShuffleWriter, hashPart3Vectors) {
*shuffleWriter, {hashInputVector1_, hashInputVector2_, hashInputVector1_}, 2, {blockPid2, blockPid1});
}

TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
TEST_P(HashPartitioningShuffleWriterTest, hashLargeVectors) {
const int32_t expectedMaxBatchSize = 8;
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);
Expand All @@ -567,7 +573,7 @@ TEST_P(HashPartitioningShuffleWriter, hashLargeVectors) {
testShuffleRoundTrip(*shuffleWriter, {hashInputVector2_, hashInputVector1_}, 2, {blockPid2, blockPid1});
}

TEST_P(RangePartitioningShuffleWriter, range) {
TEST_P(RangePartitioningShuffleWriterTest, range) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);

Expand All @@ -578,7 +584,7 @@ TEST_P(RangePartitioningShuffleWriter, range) {
*shuffleWriter, {compositeBatch1_, compositeBatch2_, compositeBatch1_}, 2, {blockPid1, blockPid2});
}

TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
TEST_P(RoundRobinPartitioningShuffleWriterTest, roundRobin) {
ASSERT_NOT_OK(initShuffleWriterOptions());
auto shuffleWriter = createShuffleWriter(2);

Expand All @@ -588,7 +594,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, roundRobin) {
testShuffleRoundTrip(*shuffleWriter, {inputVector1_, inputVector2_, inputVector1_}, 2, {blockPid1, blockPid2});
}

TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
TEST_P(RoundRobinPartitioningShuffleWriterTest, preAllocForceRealloc) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
Expand Down Expand Up @@ -646,7 +652,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) {
TEST_P(RoundRobinPartitioningShuffleWriterTest, preAllocForceReuse) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
Expand Down Expand Up @@ -681,7 +687,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) {
ASSERT_NOT_OK(shuffleWriter->stop());
}

TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
TEST_P(RoundRobinPartitioningShuffleWriterTest, spillVerifyResult) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kHashShuffle) {
return;
}
Expand Down Expand Up @@ -721,7 +727,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, spillVerifyResult) {
shuffleWriteReadMultiBlocks(*shuffleWriter, 2, {blockPid1, blockPid2});
}

TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {
TEST_P(RoundRobinPartitioningShuffleWriterTest, sortMaxRows) {
if (GetParam().shuffleWriterType != ShuffleWriterType::kSortShuffle) {
return;
}
Expand All @@ -738,22 +744,22 @@ TEST_P(RoundRobinPartitioningShuffleWriter, sortMaxRows) {

INSTANTIATE_TEST_SUITE_P(
SinglePartitioningShuffleWriterGroup,
SinglePartitioningShuffleWriter,
SinglePartitioningShuffleWriterTest,
::testing::ValuesIn(getTestParams()));

INSTANTIATE_TEST_SUITE_P(
RoundRobinPartitioningShuffleWriterGroup,
RoundRobinPartitioningShuffleWriter,
RoundRobinPartitioningShuffleWriterTest,
::testing::ValuesIn(getTestParams()));

INSTANTIATE_TEST_SUITE_P(
HashPartitioningShuffleWriterGroup,
HashPartitioningShuffleWriter,
HashPartitioningShuffleWriterTest,
::testing::ValuesIn(getTestParams()));

INSTANTIATE_TEST_SUITE_P(
RangePartitioningShuffleWriterGroup,
RangePartitioningShuffleWriter,
RangePartitioningShuffleWriterTest,
::testing::ValuesIn(getTestParams()));

} // namespace gluten
Expand Down