Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 69 additions & 79 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1087,21 +1087,33 @@ struct FileWriterHelper {
int64_t footer_offset_;
};

// Helper class since coalescing will not happen if the file is zero copy
class NoZeroCopyBufferReader : public io::BufferReader {
using BufferReader::BufferReader;
bool supports_zero_copy() const override { return false; }
};

template <bool kCoalesce>
struct FileGeneratorWriterHelper : public FileWriterHelper {
Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
ReadStats* out_stats = nullptr) override {
auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
std::shared_ptr<io::RandomAccessFile> buf_reader;
if (kCoalesce) {
// Use a non-zero-copy enabled BufferReader so we can test paths properly
buf_reader = std::make_shared<NoZeroCopyBufferReader>(buffer_);
} else {
buf_reader = std::make_shared<io::BufferReader>(buffer_);
}
AsyncGenerator<std::shared_ptr<RecordBatch>> generator;

{
auto fut =
RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, options);
auto fut = RecordBatchFileReader::OpenAsync(buf_reader, footer_offset_, options);
// Do NOT assert OK since some tests check whether this fails properly
EXPECT_FINISHES(fut);
ARROW_ASSIGN_OR_RAISE(auto reader, fut.result());
EXPECT_EQ(num_batches_written_, reader->num_record_batches());
// Generator will keep reader alive internally
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(kCoalesce));
}

// Generator is async-reentrant
Expand Down Expand Up @@ -1241,8 +1253,7 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
using WriterHelper = WriterHelperType;

// Check simple RecordBatch roundtripping
template <typename Param>
void TestRoundTrip(Param&& param, const IpcWriteOptions& options) {
void TestRoundTrip(test::MakeRecordBatch param, const IpcWriteOptions& options) {
std::shared_ptr<RecordBatch> batch1;
std::shared_ptr<RecordBatch> batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
Expand All @@ -1262,8 +1273,8 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}

template <typename Param>
void TestZeroLengthRoundTrip(Param&& param, const IpcWriteOptions& options) {
void TestZeroLengthRoundTrip(test::MakeRecordBatch param,
const IpcWriteOptions& options) {
std::shared_ptr<RecordBatch> batch1;
std::shared_ptr<RecordBatch> batch2;
ASSERT_OK(param(&batch1)); // NOLINT clang-tidy gtest issue
Expand All @@ -1285,6 +1296,16 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
}
}

void TestRoundTripWithOptions(test::MakeRecordBatch make_record_batch) {
TestRoundTrip(make_record_batch, IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(make_record_batch, IpcWriteOptions::Defaults());

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(make_record_batch, options);
TestZeroLengthRoundTrip(make_record_batch, options);
}

void TestDictionaryRoundtrip() {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeDictionary(&batch));
Expand Down Expand Up @@ -1456,8 +1477,13 @@ class ReaderWriterMixin : public ExtensionTypesMixin {
class TestFileFormat : public ReaderWriterMixin<FileWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};

class TestFileFormatGenerator : public ReaderWriterMixin<FileGeneratorWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};
class TestFileFormatGenerator
: public ReaderWriterMixin<FileGeneratorWriterHelper</*kCoalesce=*/false>>,
public ::testing::TestWithParam<MakeRecordBatch*> {};

class TestFileFormatGeneratorCoalesced
: public ReaderWriterMixin<FileGeneratorWriterHelper</*kCoalesce=*/true>>,
public ::testing::TestWithParam<MakeRecordBatch*> {};

class TestStreamFormat : public ReaderWriterMixin<StreamWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};
Expand All @@ -1473,26 +1499,6 @@ class TestStreamDecoderLargeChunks
: public ReaderWriterMixin<StreamDecoderLargeChunksWriterHelper>,
public ::testing::TestWithParam<MakeRecordBatch*> {};

TEST_P(TestFileFormat, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}

TEST_P(TestFileFormatGenerator, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}

Status MakeDictionaryBatch(std::shared_ptr<RecordBatch>* out) {
auto f0_type = arrow::dictionary(int32(), utf8());
auto f1_type = arrow::dictionary(int8(), utf8());
Expand Down Expand Up @@ -1644,62 +1650,32 @@ TEST(TestDictionaryBatch, DictionaryReplacement) {
ASSERT_BATCHES_EQUAL(*in_batch, *out_batch);
}

TEST_P(TestStreamFormat, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}
TEST_P(TestFileFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

TEST_P(TestStreamDecoderData, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TEST_P(TestFileFormatGenerator, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
TEST_P(TestFileFormatGeneratorCoalesced, RoundTrip) {
TestRoundTripWithOptions(*GetParam());
}

TEST_P(TestStreamDecoderBuffer, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TEST_P(TestStreamFormat, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}
TEST_P(TestStreamDecoderData, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

TEST_P(TestStreamDecoderSmallChunks, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TEST_P(TestStreamDecoderBuffer, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}
TEST_P(TestStreamDecoderSmallChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

TEST_P(TestStreamDecoderLargeChunks, RoundTrip) {
TestRoundTrip(*GetParam(), IpcWriteOptions::Defaults());
TestZeroLengthRoundTrip(*GetParam(), IpcWriteOptions::Defaults());

IpcWriteOptions options;
options.write_legacy_ipc_format = true;
TestRoundTrip(*GetParam(), options);
TestZeroLengthRoundTrip(*GetParam(), options);
}
TEST_P(TestStreamDecoderLargeChunks, RoundTrip) { TestRoundTripWithOptions(*GetParam()); }

INSTANTIATE_TEST_SUITE_P(GenericIpcRoundTripTests, TestIpcRoundTrip,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGenerator,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(FileRoundTripTests, TestFileFormatGeneratorCoalesced,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamRoundTripTests, TestStreamFormat,
::testing::ValuesIn(kBatchCases));
INSTANTIATE_TEST_SUITE_P(StreamDecoderDataRoundTripTests, TestStreamDecoderData,
Expand Down Expand Up @@ -1817,28 +1793,42 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
#endif

TEST_F(TestStreamFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }

TEST_F(TestFileFormat, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }

TEST_F(TestFileFormatGenerator, DictionaryRoundTrip) { TestDictionaryRoundtrip(); }
TEST_F(TestFileFormatGeneratorCoalesced, DictionaryRoundTrip) {
TestDictionaryRoundtrip();
}

TEST_F(TestStreamFormat, DifferentSchema) { TestWriteDifferentSchema(); }

TEST_F(TestFileFormat, DifferentSchema) { TestWriteDifferentSchema(); }

TEST_F(TestFileFormatGenerator, DifferentSchema) { TestWriteDifferentSchema(); }
TEST_F(TestFileFormatGeneratorCoalesced, DifferentSchema) { TestWriteDifferentSchema(); }

TEST_F(TestStreamFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }

TEST_F(TestFileFormat, NoRecordBatches) { TestWriteNoRecordBatches(); }

TEST_F(TestFileFormatGenerator, NoRecordBatches) { TestWriteNoRecordBatches(); }
TEST_F(TestFileFormatGeneratorCoalesced, NoRecordBatches) { TestWriteNoRecordBatches(); }

TEST_F(TestStreamFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }

TEST_F(TestFileFormat, ReadFieldSubset) { TestReadSubsetOfFields(); }

TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
TEST_F(TestFileFormatGeneratorCoalesced, ReadFieldSubset) { TestReadSubsetOfFields(); }

TEST_F(TestFileFormatGeneratorCoalesced, Errors) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(MakeIntRecordBatch(&batch));

FileWriterHelper helper;
ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
ASSERT_OK(helper.WriteBatch(batch));
ASSERT_OK(helper.Finish());

auto buf_reader = std::make_shared<NoZeroCopyBufferReader>(helper.buffer_);
ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(buf_reader.get()));

ASSERT_RAISES_WITH_MESSAGE(Invalid, "Invalid: Cannot coalesce without an owned file",
reader->GetRecordBatchGenerator(/*coalesce=*/true));
}

class TrackedRandomAccessFile : public io::RandomAccessFile {
public:
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1296,11 +1296,13 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
}

std::shared_ptr<io::internal::ReadRangeCache> cached_source;
if (coalesce && file_->supports_zero_copy()) {
if (coalesce && !file_->supports_zero_copy()) {
if (!owned_file_) return Status::Invalid("Cannot coalesce without an owned file");
// Since the user is asking for all fields then we can cache the entire
// file (up to the footer)
return cached_source->Cache({{0, footer_offset_}});
cached_source = std::make_shared<io::internal::ReadRangeCache>(file_, io_context,
cache_options);
RETURN_NOT_OK(cached_source->Cache({{0, footer_offset_}}));
}
return WholeIpcFileRecordBatchGenerator(std::move(state), std::move(cached_source),
io_context, executor);
Expand Down