diff --git a/cpp/src/arrow/io/compressed.cc b/cpp/src/arrow/io/compressed.cc index d0aebda6faf1..4993ae2dd373 100644 --- a/cpp/src/arrow/io/compressed.cc +++ b/cpp/src/arrow/io/compressed.cc @@ -342,7 +342,7 @@ class CompressedInputStream::Impl { RETURN_NOT_OK(EnsureCompressedData()); if (compressed_pos_ == compressed_->size()) { // No more data to decompress - if (!fresh_decompressor_) { + if (!fresh_decompressor_ && !decompressor_->IsFinished()) { return Status::IOError("Truncated compressed stream"); } *has_data = false; diff --git a/cpp/src/arrow/io/compressed_test.cc b/cpp/src/arrow/io/compressed_test.cc index 7e21ba8e95f2..4da0a1bc1919 100644 --- a/cpp/src/arrow/io/compressed_test.cc +++ b/cpp/src/arrow/io/compressed_test.cc @@ -210,13 +210,28 @@ TEST_P(CompressedInputStreamTest, ConcatenatedStreams) { auto data2 = MakeCompressibleData(200); auto compressed1 = CompressDataOneShot(codec.get(), data1); auto compressed2 = CompressDataOneShot(codec.get(), data2); + std::vector expected; + std::copy(data1.begin(), data1.end(), std::back_inserter(expected)); + std::copy(data2.begin(), data2.end(), std::back_inserter(expected)); ASSERT_OK_AND_ASSIGN(auto concatenated, ConcatenateBuffers({compressed1, compressed2})); - std::vector decompressed, expected; + std::vector decompressed; ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); - std::copy(data1.begin(), data1.end(), std::back_inserter(expected)); - std::copy(data2.begin(), data2.end(), std::back_inserter(expected)); + ASSERT_EQ(decompressed.size(), expected.size()); + ASSERT_EQ(decompressed, expected); + + // Same, but with an empty decompressed stream in the middle + auto compressed_empty = CompressDataOneShot(codec.get(), {}); + ASSERT_OK_AND_ASSIGN(concatenated, + ConcatenateBuffers({compressed1, compressed_empty, compressed2})); + ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); + ASSERT_EQ(decompressed.size(), expected.size()); + ASSERT_EQ(decompressed, expected); + // Same, but with an empty decompressed stream at the end + ASSERT_OK_AND_ASSIGN(concatenated, + ConcatenateBuffers({compressed1, compressed2, compressed_empty})); + ASSERT_OK(RunCompressedInputStream(codec.get(), concatenated, &decompressed)); ASSERT_EQ(decompressed.size(), expected.size()); ASSERT_EQ(decompressed, expected); }