diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index af67d1d6b137..5a379b86565b 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -548,14 +548,18 @@ VeloxRssSortShuffleReaderDeserializer::VeloxRssSortShuffleReaderDeserializer( batchSize_(batchSize), veloxCompressionType_(veloxCompressionType), serde_(getNamedVectorSerde(facebook::velox::VectorSerde::Kind::kPresto)), - deserializeTime_(deserializeTime) { - constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; - auto buffer = AlignedBuffer::allocate(kMaxReadBufferSize, veloxPool_.get()); - in_ = std::make_unique(std::move(in), std::move(buffer)); + deserializeTime_(deserializeTime), + arrowIn_(in) { serdeOptions_ = {false, veloxCompressionType_}; } std::shared_ptr VeloxRssSortShuffleReaderDeserializer::next() { + if (in_ == nullptr) { + constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; + auto buffer = AlignedBuffer::allocate(kMaxReadBufferSize, veloxPool_.get()); + in_ = std::make_unique(std::move(arrowIn_), std::move(buffer)); + } + if (!in_->hasNext()) { return nullptr; } diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index 8ebdbf2bacab..d7aa145ca136 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -130,6 +130,7 @@ class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator { facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; int64_t& deserializeTime_; std::shared_ptr in_; + std::shared_ptr arrowIn_; }; class VeloxShuffleReaderDeserializerFactory {