From cf4d62e82d72f537ded62cb22c20b7c3b8f94cf9 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 20 Apr 2022 09:38:58 -0400
Subject: [PATCH 1/2] ARROW-16238: [C++] Fix nullptr dereference when
pre-buffering IPC reads
---
cpp/src/arrow/ipc/read_write_test.cc | 146 ++++++++++++---------------
cpp/src/arrow/ipc/reader.cc | 6 +-
2 files changed, 70 insertions(+), 82 deletions(-)
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 14a274dff42..f776dac7037 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -1087,21 +1087,33 @@ struct FileWriterHelper {
int64_t footer_offset_;
};
+// Helper class since coalescing will not happen if the file is zero copy
+class NoZeroCopyBufferReader : public io::BufferReader {
+ using BufferReader::BufferReader;
+ bool supports_zero_copy() const override { return false; }
+};
+
+template
struct FileGeneratorWriterHelper : public FileWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
ReadStats* out_stats = nullptr) override {
- auto buf_reader = std::make_shared(buffer_);
+ std::shared_ptr buf_reader;
+ if (kCoalesce) {
+ // Use a non-zero-copy enabled BufferReader so we can test paths properly
+ buf_reader = std::make_shared(buffer_);
+ } else {
+ buf_reader = std::make_shared(buffer_);
+ }
AsyncGenerator> generator;
{
- auto fut =
- RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options);
+ auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
// Do NOT assert OK since some tests check whether this fails properly
EXPECT_FINISHES(fut);
ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
// Generator will keep reader alive internally
- ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+ ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
}
// Generator is async-reentrant
@@ -1241,8 +1253,7 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
using WriterHelper = WriterHelperType;
// Check simple RecordBatch roundtripping
- template
- void TestRoundTrip(Param&& param, const IpcWriteOptions& options) {
+ void TestRoundTrip(test::MakeRecordBatch param, const IpcWriteOptions& options) {
std::shared_ptr batch1;
std::shared_ptr batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
@@ -1262,8 +1273,8 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}
- template
- void TestZeroLengthRoundTrip(Param&& param, const IpcWriteOptions& options) {
+ void TestZeroLengthRoundTrip(test::MakeRecordBatch param,
+ const IpcWriteOptions& options) {
std::shared_ptr batch1;
std::shared_ptr batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
@@ -1285,6 +1296,16 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}
+ void TestRoundTripWithOptions(test::MakeRecordBatch make_record_batch) {
+ TestRoundTrip(make_record_batch, IpcWriteOptions::Defaults());
+ TestZeroLengthRoundTrip(make_record_batch, IpcWriteOptions::Defaults());
+
+ IpcWriteOptions options;
+ options.write_legacy_ipc_format = true;
+ TestRoundTrip(make_record_batch, options);
+ TestZeroLengthRoundTrip(make_record_batch, options);
+ }
+
void TestDictionaryRoundtrip() {
std::shared_ptr batch;
ASSERT_OK(MakeDictionary(&batch));
@@ -1456,8 +1477,13 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
class TestFileFormat : public ReaderWriterMixin,
public ::testing::TestWithParam {};
-class TestFileFormatGenerator : public ReaderWriterMixin,
- public ::testing::TestWithParam {};
+class TestFileFormatGenerator
+ : public ReaderWriterMixin>,
+ public ::testing::TestWithParam {};
+
+class TestFileFormatCoalesced
+ : public ReaderWriterMixin>,
+ public ::testing::TestWithParam {};
class TestStreamFormat : public ReaderWriterMixin,
public ::testing::TestWithParam {};
@@ -1473,26 +1499,6 @@ class TestStreamDecoderLargeChunks
: public ReaderWriterMixin,
public ::testing::TestWithParam {};
-TEST_P(TestFileFormat, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
-
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
-
-TEST_P(TestFileFormatGenerator, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
-
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
-
Status MakeDictionaryBatch(std::shared_ptr* out) {
auto f0_type = arrow::dictionary(int32(), utf8());
auto f1_type = arrow::dictionary(int8(), utf8());
@@ -1644,55 +1650,21 @@ TEST(TestDictionaryBatch, DictionaryReplacement) {
ASSERT_BATCHES_EQUAL(*in_batch, *out_batch);
}
-TEST_P(TestStreamFormat, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+TEST_P(TestFileFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
+TEST_P(TestFileFormatGenerator, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
-TEST_P(TestStreamDecoderData, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+TEST_P(TestFileFormatCoalesced, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
+TEST_P(TestStreamFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
-TEST_P(TestStreamDecoderBuffer, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+TEST_P(TestStreamDecoderData, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
+TEST_P(TestStreamDecoderBuffer, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
-TEST_P(TestStreamDecoderSmallChunks, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
+TEST_P(TestStreamDecoderSmallChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
-
-TEST_P(TestStreamDecoderLargeChunks, RoundTrip) {
- TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
- TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
-
- IpcWriteOptions options;
- options.write_legacy_ipc_format = true;
- TestRoundTrip(*GetParam(), options);
- TestZeroLengthRoundTrip(*GetParam(), options);
-}
+TEST_P(TestStreamDecoderLargeChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
INSTANTIATE_TEST_SUITE_P(GenericIpcRoundTripTests, TestIpcRoundTrip,
::testing::ValuesIn(kBatchCases));
@@ -1700,6 +1672,8 @@ INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator,
::testing::ValuesIn(kBatchCases));
+INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatCoalesced,
+ ::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamDecoderDataRoundTripTests, TestStreamDecoderData,
@@ -1817,28 +1791,40 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
#endif
TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
-
TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
-
TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
+TEST_F(TestFileFormatCoalesced, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }
-
TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); }
-
TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); }
+TEST_F(TestFileFormatCoalesced, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
-
TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
-
TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); }
+TEST_F(TestFileFormatCoalesced, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
-
TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
-
TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
+TEST_F(TestFileFormatCoalesced, ReadFieldSubset) { TestReadSubsetOfFields(); }
+
+TEST_F(TestFileFormatCoalesced, Errors) {
+ std::shared_ptr batch;
+ ASSERT_OK(MakeIntRecordBatch(&batch));
+
+ FileWriterHelper helper;
+ ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
+ ASSERT_OK(helper.WriteBatch(batch));
+ ASSERT_OK(helper.Finish());
+
+ auto buf_reader = std::make_shared(helper.buffer_);
+ ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(buf_reader.get()));
+
+ ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Cannot coalesce without an owned file",
+ reader->GetRecordBatchGenerator(/*coalesce=*/true));
+}
class TrackedRandomAccessFile : public io::RandomAccessFile {
public:
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 7875cd3cdc5..a5f31d74feb 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1296,11 +1296,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}
std::shared_ptr cached_source;
- if (coalesce && file_->supports_zero_copy()) {
+ if (coalesce && !file_->supports_zero_copy()) {
if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file");
// Since the user is asking for all fields then we can cache the entire
// file (up to the footer)
- return cached_source->Cache({{0, footer_offset_}});
+ cached_source = std::make_shared(file_, io_context,
+ cache_options);
+ RETURN_NOT_OK(cached_source->Cache({{0, footer_offset_}}));
}
return WholeIpcFileRecordBatchGenerator(std::move(state), std::move(cached_source),
io_context, executor);
From b6d09598fb656a38d4ea1c9428920ac58ae7fd24 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 22 Apr 2022 12:07:50 -0400
Subject: [PATCH 2/2] Rename test fixture
---
cpp/src/arrow/ipc/read_write_test.cc | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index f776dac7037..1a4784fcf59 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -1481,7 +1481,7 @@ class TestFileFormatGenerator
: public ReaderWriterMixin>,
public ::testing::TestWithParam {};
-class TestFileFormatCoalesced
+class TestFileFormatGeneratorCoalesced
: public ReaderWriterMixin>,
public ::testing::TestWithParam {};
@@ -1654,7 +1654,9 @@ TEST_P(TestFileFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
TEST_P(TestFileFormatGenerator, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
-TEST_P(TestFileFormatCoalesced, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
+TEST_P(TestFileFormatGeneratorCoalesced, RoundTrip) {
+ TestRoundTripWithOptions(*GetParam());
+}
TEST_P(TestStreamFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }
@@ -1672,7 +1674,7 @@ INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator,
::testing::ValuesIn(kBatchCases));
-INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatCoalesced,
+INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGeneratorCoalesced,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat,
::testing::ValuesIn(kBatchCases));
@@ -1793,24 +1795,26 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
-TEST_F(TestFileFormatCoalesced, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
+TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) {
+ TestDictionaryRoundtrip();
+}
TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); }
-TEST_F(TestFileFormatCoalesced, DifferentSchema) { TestWriteDifferentSchema(); }
+TEST_F(TestFileFormatGeneratorCoalesced, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); }
-TEST_F(TestFileFormatCoalesced, NoRecordBatches) { TestWriteNoRecordBatches(); }
+TEST_F(TestFileFormatGeneratorCoalesced, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }
TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
-TEST_F(TestFileFormatCoalesced, ReadFieldSubset) { TestReadSubsetOfFields(); }
+TEST_F(TestFileFormatGeneratorCoalesced, ReadFieldSubset) { TestReadSubsetOfFields(); }
-TEST_F(TestFileFormatCoalesced, Errors) {
+TEST_F(TestFileFormatGeneratorCoalesced, Errors) {
std::shared_ptr batch;
ASSERT_OK(MakeIntRecordBatch(&batch));