diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 14a274dff42..1a4784fcf59 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1087,21 +1087,33 @@ struct FileWriterHelper { int64_t footer_offset_; }; +// Helper class since coalescing will not happen if the file is zero copy +class NoZeroCopyBufferReader : public io::BufferReader { + using BufferReader::BufferReader; + bool supports_zero_copy() const override { return false; } +}; + +template struct FileGeneratorWriterHelper : public FileWriterHelper { Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches, ReadStats* out_stats = nullptr) override { - auto buf_reader = std::make_shared(buffer_); + std::shared_ptr buf_reader; + if (kCoalesce) { + // Use a non-zero-copy enabled BufferReader so we can test paths properly + buf_reader = std::make_shared(buffer_); + } else { + buf_reader = std::make_shared(buffer_); + } AsyncGenerator> generator; { - auto fut = - RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options); + auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options); // Do NOT assert OK since some tests check whether this fails properly EXPECT_FINISHES(fut); ARROW_ASSIGN_OR_RAISE(auto reader, fut.result()); EXPECT_EQ(num_batches_written_, reader->num_record_batches()); // Generator will keep reader alive internally - ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator()); + ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce)); } // Generator is async-reentrant @@ -1241,8 +1253,7 @@ class ReaderWriterMixin : public ExtensionTypesMixin { using WriterHelper = WriterHelperType; // Check simple RecordBatch roundtripping - template - void TestRoundTrip(Param&& param, const IpcWriteOptions& options) { + void TestRoundTrip(test::MakeRecordBatch param, const IpcWriteOptions& options) { std::shared_ptr batch1; std::shared_ptr batch2; ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue @@ -1262,8 +1273,8 @@ class ReaderWriterMixin : public ExtensionTypesMixin { } } - template - void TestZeroLengthRoundTrip(Param&& param, const IpcWriteOptions& options) { + void TestZeroLengthRoundTrip(test::MakeRecordBatch param, + const IpcWriteOptions& options) { std::shared_ptr batch1; std::shared_ptr batch2; ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue @@ -1285,6 +1296,16 @@ class ReaderWriterMixin : public ExtensionTypesMixin { } } + void TestRoundTripWithOptions(test::MakeRecordBatch make_record_batch) { + TestRoundTrip(make_record_batch, IpcWriteOptions::Defaults()); + TestZeroLengthRoundTrip(make_record_batch, IpcWriteOptions::Defaults()); + + IpcWriteOptions options; + options.write_legacy_ipc_format = true; + TestRoundTrip(make_record_batch, options); + TestZeroLengthRoundTrip(make_record_batch, options); + } + void TestDictionaryRoundtrip() { std::shared_ptr batch; ASSERT_OK(MakeDictionary(&batch)); @@ -1456,8 +1477,13 @@ class ReaderWriterMixin : public ExtensionTypesMixin { class TestFileFormat : public ReaderWriterMixin, public ::testing::TestWithParam {}; -class TestFileFormatGenerator : public ReaderWriterMixin, - public ::testing::TestWithParam {}; +class TestFileFormatGenerator + : public ReaderWriterMixin>, + public ::testing::TestWithParam {}; + +class TestFileFormatGeneratorCoalesced + : public ReaderWriterMixin>, + public ::testing::TestWithParam {}; class TestStreamFormat : public ReaderWriterMixin, public ::testing::TestWithParam {}; @@ -1473,26 +1499,6 @@ class TestStreamDecoderLargeChunks : public ReaderWriterMixin, public ::testing::TestWithParam {}; -TEST_P(TestFileFormat, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} - -TEST_P(TestFileFormatGenerator, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} - Status MakeDictionaryBatch(std::shared_ptr* out) { auto f0_type = arrow::dictionary(int32(), utf8()); auto f1_type = arrow::dictionary(int8(), utf8()); @@ -1644,55 +1650,23 @@ TEST(TestDictionaryBatch, DictionaryReplacement) { ASSERT_BATCHES_EQUAL(*in_batch, *out_batch); } -TEST_P(TestStreamFormat, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} +TEST_P(TestFileFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } -TEST_P(TestStreamDecoderData, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); +TEST_P(TestFileFormatGenerator, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); +TEST_P(TestFileFormatGeneratorCoalesced, RoundTrip) { + TestRoundTripWithOptions(*GetParam()); } -TEST_P(TestStreamDecoderBuffer, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); +TEST_P(TestStreamFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} +TEST_P(TestStreamDecoderData, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } -TEST_P(TestStreamDecoderSmallChunks, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); +TEST_P(TestStreamDecoderBuffer, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} +TEST_P(TestStreamDecoderSmallChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } -TEST_P(TestStreamDecoderLargeChunks, RoundTrip) { - TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults()); - - IpcWriteOptions options; - options.write_legacy_ipc_format = true; - TestRoundTrip(*GetParam(), options); - TestZeroLengthRoundTrip(*GetParam(), options); -} +TEST_P(TestStreamDecoderLargeChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); } INSTANTIATE_TEST_SUITE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, ::testing::ValuesIn(kBatchCases)); @@ -1700,6 +1674,8 @@ INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat, ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator, ::testing::ValuesIn(kBatchCases)); +INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGeneratorCoalesced, + ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat, ::testing::ValuesIn(kBatchCases)); INSTANTIATE_TEST_SUITE_P(StreamDecoderDataRoundTripTests, TestStreamDecoderData, @@ -1817,28 +1793,42 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) { #endif TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } - TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } - TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); } +TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) { + TestDictionaryRoundtrip(); +} TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); } - TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); } - TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); } +TEST_F(TestFileFormatGeneratorCoalesced, DifferentSchema) { TestWriteDifferentSchema(); } TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); } - TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); } - TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); } +TEST_F(TestFileFormatGeneratorCoalesced, NoRecordBatches) { TestWriteNoRecordBatches(); } TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); } - TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); } - TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); } +TEST_F(TestFileFormatGeneratorCoalesced, ReadFieldSubset) { TestReadSubsetOfFields(); } + +TEST_F(TestFileFormatGeneratorCoalesced, Errors) { + std::shared_ptr batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + FileWriterHelper helper; + ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults())); + ASSERT_OK(helper.WriteBatch(batch)); + ASSERT_OK(helper.Finish()); + + auto buf_reader = std::make_shared(helper.buffer_); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(buf_reader.get())); + + ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Cannot coalesce without an owned file", + reader->GetRecordBatchGenerator(/*coalesce=*/true)); +} class TrackedRandomAccessFile : public io::RandomAccessFile { public: diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 7875cd3cdc5..a5f31d74feb 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -1296,11 +1296,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { } std::shared_ptr cached_source; - if (coalesce && file_->supports_zero_copy()) { + if (coalesce && !file_->supports_zero_copy()) { if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file"); // Since the user is asking for all fields then we can cache the entire // file (up to the footer) - return cached_source->Cache({{0, footer_offset_}}); + cached_source = std::make_shared(file_, io_context, + cache_options); + RETURN_NOT_OK(cached_source->Cache({{0, footer_offset_}})); } return WholeIpcFileRecordBatchGenerator(std::move(state), std::move(cached_source), io_context, executor);