From 099d6d0217ba9db076ba440771db508fe2136aaf Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 27 Apr 2021 16:10:30 -0400
Subject: [PATCH 1/8] ARROW-11843: [C++] Specialize Future<>::Future(Status)
---
cpp/src/arrow/util/future.h | 3 +++
1 file changed, 3 insertions(+)
diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h
index 132443176ed..cd90d943fcd 100644
--- a/cpp/src/arrow/util/future.h
+++ b/cpp/src/arrow/util/future.h
@@ -829,6 +829,9 @@ Future>> All(std::vector> futures) {
return out;
}
+template <>
+inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}
+
/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will be marked complete if all `futures` complete
From 54fc2ac01af5912a4886699a76eef90233d37bef Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 2 Mar 2021 15:11:59 -0500
Subject: [PATCH 2/8] ARROW-11843: [C++] Provide reentrant Parquet reader
---
cpp/src/arrow/python/io.cc | 1 +
cpp/src/arrow/testing/future_util.h | 7 +
.../parquet/arrow/arrow_reader_writer_test.cc | 66 +++-
cpp/src/parquet/arrow/reader.cc | 127 ++++++-
cpp/src/parquet/arrow/reader.h | 15 +
.../parquet/arrow/reader_writer_benchmark.cc | 40 +-
.../encryption/test_encryption_util.cc | 7 +-
cpp/src/parquet/file_reader.cc | 347 +++++++++++++-----
cpp/src/parquet/file_reader.h | 31 +-
cpp/src/parquet/properties.h | 2 +-
cpp/src/parquet/reader_test.cc | 70 ++++
python/pyarrow/_dataset.pyx | 8 +-
12 files changed, 596 insertions(+), 125 deletions(-)
diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc
index 73525feed38..d83e08b27e6 100644
--- a/cpp/src/arrow/python/io.cc
+++ b/cpp/src/arrow/python/io.cc
@@ -26,6 +26,7 @@
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
+#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/python/common.h"
diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h
index 190e5839bbf..878840587ff 100644
--- a/cpp/src/arrow/testing/future_util.h
+++ b/cpp/src/arrow/testing/future_util.h
@@ -52,6 +52,13 @@
ASSERT_RAISES(ENUM, _fut.status()); \
} while (false)
+#define EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, expr) \
+ do { \
+ auto&& fut = (expr); \
+ ASSERT_FINISHES_IMPL(fut); \
+ EXPECT_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, fut.status()); \
+ } while (false)
+
#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, _future_name) \
auto _future_name = (rexpr); \
ASSERT_FINISHES_IMPL(_future_name); \
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 303fb454880..3f2cbee6ebd 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -2287,10 +2287,9 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
ASSERT_OK(builder.Open(std::make_shared(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));
// Pre-buffer data and wait for I/O to complete.
- ASSERT_OK(reader->parquet_reader()
- ->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(),
- ::arrow::io::CacheOptions::Defaults())
- .status());
+ reader->parquet_reader()->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(),
+ ::arrow::io::CacheOptions::Defaults());
+ ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3, 4}).status());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}, &rb_reader));
@@ -2331,6 +2330,63 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_EQ(actual_batch->num_rows(), num_rows);
}
+TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
+ ArrowReaderProperties properties = default_arrow_reader_properties();
+ const int num_rows = 1024;
+ const int row_group_size = 512;
+ const int num_columns = 2;
+
+ std::shared_ptr table;
+ ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));
+
+ std::shared_ptr buffer;
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+ default_arrow_writer_properties(), &buffer));
+
+ std::shared_ptr reader;
+ {
+ std::unique_ptr unique_reader;
+ FileReaderBuilder builder;
+ ASSERT_OK(builder.Open(std::make_shared(buffer)));
+ ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
+ reader = std::move(unique_reader);
+ }
+
+ auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
+ int num_columns, int num_rows) {
+ ASSERT_NE(batch, nullptr);
+ ASSERT_EQ(batch->num_columns(), num_columns);
+ ASSERT_EQ(batch->num_rows(), num_rows);
+ };
+ {
+ ASSERT_OK_AND_ASSIGN(auto batch_generator,
+ reader->GetRecordBatchGenerator(reader, {0, 1}, {0, 1}));
+ auto fut1 = batch_generator();
+ auto fut2 = batch_generator();
+ auto fut3 = batch_generator();
+ ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
+ ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
+ ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
+ ASSERT_EQ(batch3, nullptr);
+ check_batches(batch1, num_columns, row_group_size);
+ check_batches(batch2, num_columns, row_group_size);
+ }
+ {
+ // No columns case
+ ASSERT_OK_AND_ASSIGN(auto batch_generator,
+ reader->GetRecordBatchGenerator(reader, {0, 1}, {}));
+ auto fut1 = batch_generator();
+ auto fut2 = batch_generator();
+ auto fut3 = batch_generator();
+ ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
+ ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
+ ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
+ ASSERT_EQ(batch3, nullptr);
+ check_batches(batch1, 0, row_group_size);
+ check_batches(batch2, 0, row_group_size);
+ }
+}
+
TEST(TestArrowReadWrite, ScanContents) {
const int num_columns = 20;
const int num_rows = 1000;
@@ -2700,7 +2756,7 @@ TEST(ArrowReadWrite, Decimal256) {
auto type = ::arrow::decimal256(8, 4);
- const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
+ const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
"-9999.9999", "9999.9999"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 016ceacb0ef..92f7aafe120 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -30,7 +30,9 @@
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/type.h"
+#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
+#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
@@ -291,6 +293,11 @@ class FileReaderImpl : public FileReader {
const std::vector& indices,
std::shared_ptr* table) override;
+ // Helper method used by ReadRowGroups/Generator - read the given row groups/columns,
+ // skipping bounds checks and pre-buffering.
+ Status DecodeRowGroups(const std::vector& row_groups,
+ const std::vector& indices, std::shared_ptr* table);
+
Status ReadRowGroups(const std::vector& row_groups,
std::shared_ptr* table) override {
return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table);
@@ -315,6 +322,12 @@ class FileReaderImpl : public FileReader {
Iota(reader_->metadata()->num_columns()), out);
}
+ ::arrow::Result<::arrow::AsyncGenerator>>
+ GetRecordBatchGenerator(std::shared_ptr reader,
+ const std::vector row_group_indices,
+ const std::vector column_indices,
+ ::arrow::internal::Executor* executor) override;
+
int num_columns() const { return reader_->metadata()->num_columns(); }
ParquetFileReader* parquet_reader() const override { return reader_.get(); }
@@ -890,9 +903,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups,
if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
- ARROW_UNUSED(reader_->PreBuffer(row_groups, column_indices,
- reader_properties_.io_context(),
- reader_properties_.cache_options()));
+ reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
+ reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
@@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups,
return Status::OK();
}
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch generators (where each sub-generator is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+ using RecordBatchGenerator =
+ ::arrow::AsyncGenerator>;
+
+ explicit RowGroupGenerator(std::shared_ptr arrow_reader,
+ ::arrow::internal::Executor* executor,
+ std::vector row_groups, std::vector column_indices)
+ : arrow_reader_(std::move(arrow_reader)),
+ executor_(executor),
+ row_groups_(std::move(row_groups)),
+ column_indices_(std::move(column_indices)),
+ index_(0) {}
+
+ ::arrow::Future operator()() {
+ if (index_ >= row_groups_.size()) {
+ return ::arrow::AsyncGeneratorEnd();
+ }
+ int row_group = row_groups_[index_++];
+ std::vector column_indices = column_indices_;
+ auto reader = arrow_reader_;
+ if (!reader->properties().pre_buffer()) {
+ return SubmitRead(executor_, reader, row_group, column_indices);
+ }
+ auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
+ if (executor_) ready = executor_->Transfer(ready);
+ return ready.Then([=]() -> ::arrow::Result {
+ return ReadOneRowGroup(reader, row_group, column_indices);
+ });
+ }
+
+ private:
+ // Synchronous fallback for when pre-buffer isn't enabled.
+ //
+ // Making the Parquet reader truly asynchronous requires heavy refactoring, so the
+ // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
+ // async I/O without forcing readahead.
+ static ::arrow::Future SubmitRead(
+ ::arrow::internal::Executor* executor, std::shared_ptr self,
+ const int row_group, const std::vector& column_indices) {
+ if (!executor) {
+ return Future::MakeFinished(
+ ReadOneRowGroup(self, row_group, column_indices));
+ }
+ // If we have an executor, then force transfer (even if I/O was complete)
+ return ::arrow::DeferNotOk(
+ executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+ }
+
+ static ::arrow::Result ReadOneRowGroup(
+ std::shared_ptr self, const int row_group,
+ const std::vector& column_indices) {
+ std::shared_ptr<::arrow::Table> table;
+ // Skips bound checks/pre-buffering, since we've done that already
+ RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
+ auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+ ::arrow::RecordBatchVector batches;
+ while (true) {
+ std::shared_ptr<::arrow::RecordBatch> batch;
+ RETURN_NOT_OK(table_reader->ReadNext(&batch));
+ if (!batch) {
+ break;
+ }
+ batches.push_back(batch);
+ }
+ return ::arrow::MakeVectorGenerator(std::move(batches));
+ }
+
+ std::shared_ptr arrow_reader_;
+ ::arrow::internal::Executor* executor_;
+ std::vector row_groups_;
+ std::vector column_indices_;
+ size_t index_;
+};
+
+::arrow::Result<::arrow::AsyncGenerator>>
+FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader,
+ const std::vector row_group_indices,
+ const std::vector column_indices,
+ ::arrow::internal::Executor* executor) {
+ RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
+ if (reader_properties_.pre_buffer()) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(),
+ reader_properties_.cache_options());
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+ ::arrow::AsyncGenerator<::arrow::AsyncGenerator>>
+ row_group_generator = RowGroupGenerator(
+ ::arrow::internal::checked_pointer_cast(reader), executor,
+ row_group_indices, column_indices);
+ return ::arrow::MakeConcatenatedGenerator(row_group_generator);
+}
+
Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr* out) {
RETURN_NOT_OK(BoundsCheckColumn(i));
@@ -990,12 +1098,19 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups,
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
- ARROW_UNUSED(parquet_reader()->PreBuffer(row_groups, column_indices,
- reader_properties_.io_context(),
- reader_properties_.cache_options()));
+ parquet_reader()->PreBuffer(row_groups, column_indices,
+ reader_properties_.io_context(),
+ reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
+ return DecodeRowGroups(row_groups, column_indices, out);
+}
+
+// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
+Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups,
+ const std::vector& column_indices,
+ std::shared_ptr* out) {
std::vector> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 765e2f6d39a..45908300dd7 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -21,6 +21,8 @@
#include
#include
+#include "arrow/util/async_generator.h"
+#include "arrow/util/optional.h"
#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
@@ -178,6 +180,19 @@ class PARQUET_EXPORT FileReader {
const std::vector& row_group_indices, const std::vector& column_indices,
std::unique_ptr<::arrow::RecordBatchReader>* out) = 0;
+ /// \brief Return a generator of record batches.
+ ///
+ /// The FileReader must outlive the generator, so this requires that you pass in a
+ /// shared_ptr.
+ ///
+ /// \returns error Result if either row_group_indices or column_indices contains an
+ /// invalid index
+ virtual ::arrow::Result<::arrow::AsyncGenerator>>
+ GetRecordBatchGenerator(std::shared_ptr reader,
+ const std::vector row_group_indices,
+ const std::vector column_indices,
+ ::arrow::internal::Executor* executor = NULLPTR) = 0;
+
::arrow::Status GetRecordBatchReader(const std::vector& row_group_indices,
const std::vector& column_indices,
std::shared_ptr<::arrow::RecordBatchReader>* out);
diff --git a/cpp/src/parquet/arrow/reader_writer_benchmark.cc b/cpp/src/parquet/arrow/reader_writer_benchmark.cc
index 6f5d195aad6..42ace70e0fe 100644
--- a/cpp/src/parquet/arrow/reader_writer_benchmark.cc
+++ b/cpp/src/parquet/arrow/reader_writer_benchmark.cc
@@ -33,6 +33,7 @@
#include "arrow/array/builder_primitive.h"
#include "arrow/io/memory.h"
#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging.h"
@@ -534,6 +535,7 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish());
+ std::vector rgs{0, 2, 4, 6, 8};
while (state.KeepRunning()) {
auto reader =
@@ -541,16 +543,6 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::unique_ptr arrow_reader;
EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
&arrow_reader));
-
- std::vector> tables;
- std::vector rgs;
- for (int i = 0; i < arrow_reader->num_row_groups(); i++) {
- // Only read the even numbered RowGroups
- if ((i % 2) == 0) {
- rgs.push_back(i);
- }
- }
-
std::shared_ptr<::arrow::Table> table;
EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table));
}
@@ -559,6 +551,34 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
BENCHMARK(BM_ReadMultipleRowGroups);
+static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) {
+ std::vector values(BENCHMARK_SIZE, 128);
+ std::shared_ptr<::arrow::Table> table = TableFromVector(values, true);
+ auto output = CreateOutputStream();
+ // This writes 10 RowGroups
+ EXIT_NOT_OK(
+ WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+ PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish());
+ std::vector rgs{0, 2, 4, 6, 8};
+
+ while (state.KeepRunning()) {
+ auto reader =
+ ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+ std::unique_ptr unique_reader;
+ EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader),
+ &unique_reader));
+ std::shared_ptr arrow_reader = std::move(unique_reader);
+ ASSIGN_OR_ABORT(auto generator,
+ arrow_reader->GetRecordBatchGenerator(arrow_reader, rgs, {0}));
+ auto fut = ::arrow::CollectAsyncGenerator(generator);
+ ASSIGN_OR_ABORT(auto batches, fut.result());
+ ASSIGN_OR_ABORT(auto actual, ::arrow::Table::FromRecordBatches(std::move(batches)));
+ }
+ SetBytesProcessed(state);
+}
+
+BENCHMARK(BM_ReadMultipleRowGroupsGenerator);
+
} // namespace benchmark
} // namespace parquet
diff --git a/cpp/src/parquet/encryption/test_encryption_util.cc b/cpp/src/parquet/encryption/test_encryption_util.cc
index 8fe048e3bcd..217dc234ac7 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.cc
+++ b/cpp/src/parquet/encryption/test_encryption_util.cc
@@ -284,6 +284,7 @@ void FileEncryptor::EncryptFile(
// Close the ParquetFileWriter
file_writer->Close();
+ PARQUET_THROW_NOT_OK(out_file->Close());
return;
} // namespace test
@@ -334,7 +335,10 @@ void FileDecryptor::DecryptFile(
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
}
- auto file_reader = parquet::ParquetFileReader::OpenFile(file, false, reader_properties);
+ std::shared_ptr<::arrow::io::RandomAccessFile> source;
+ PARQUET_ASSIGN_OR_THROW(
+ source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool()));
+ auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties);
// Get the File MetaData
std::shared_ptr file_metadata = file_reader->metadata();
@@ -475,6 +479,7 @@ void FileDecryptor::DecryptFile(
ASSERT_EQ(flba_md->num_values(), i);
}
file_reader->Close();
+ PARQUET_THROW_NOT_OK(source->Close());
}
} // namespace test
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 4ff214232e5..3a580f15427 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -258,10 +258,10 @@ class SerializedFile : public ParquetFileReader::Contents {
file_metadata_ = std::move(metadata);
}
- ::arrow::Future<> PreBuffer(const std::vector& row_groups,
- const std::vector& column_indices,
- const ::arrow::io::IOContext& ctx,
- const ::arrow::io::CacheOptions& options) {
+ void PreBuffer(const std::vector& row_groups,
+ const std::vector& column_indices,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
cached_source_ =
std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options);
std::vector<::arrow::io::ReadRange> ranges;
@@ -272,10 +272,79 @@ class SerializedFile : public ParquetFileReader::Contents {
}
}
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
- return cached_source_->Wait();
}
+ ::arrow::Future<> WhenBuffered(const std::vector& row_groups,
+ const std::vector& column_indices) const {
+ if (!cached_source_) {
+ return ::arrow::Status::Invalid("Must call PreBuffer before WhenBuffered");
+ }
+ std::vector<::arrow::io::ReadRange> ranges;
+ for (int row : row_groups) {
+ for (int col : column_indices) {
+ ranges.push_back(
+ ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col));
+ }
+ }
+ return cached_source_->WaitFor(ranges);
+ }
+
+ // Metadata/footer parsing. Divided up to separate sync/async paths, and to use
+ // exceptions for error handling (with the async path converting to Future/Status).
+
void ParseMetaData() {
+ int64_t footer_read_size = GetFooterReadSize();
+ PARQUET_ASSIGN_OR_THROW(
+ auto footer_buffer,
+ source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
+ uint32_t metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+ int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+ std::shared_ptr<::arrow::Buffer> metadata_buffer;
+ if (footer_read_size >= (metadata_len + kFooterSize)) {
+ metadata_buffer = SliceBuffer(
+ footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len);
+ } else {
+ PARQUET_ASSIGN_OR_THROW(metadata_buffer,
+ source_->ReadAt(metadata_start, metadata_len));
+ }
+
+ // Parse the footer depending on encryption type
+ const bool is_encrypted_footer =
+ memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0;
+ if (is_encrypted_footer) {
+ // Encrypted file with Encrypted footer.
+ const std::pair read_size =
+ ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len);
+ // Read the actual footer
+ metadata_start = read_size.first;
+ metadata_len = read_size.second;
+ PARQUET_ASSIGN_OR_THROW(metadata_buffer,
+ source_->ReadAt(metadata_start, metadata_len));
+ // Fall through
+ }
+
+ const uint32_t read_metadata_len =
+ ParseUnencryptedFileMetadata(metadata_buffer, metadata_len);
+ auto file_decryption_properties = properties_.file_decryption_properties().get();
+ if (is_encrypted_footer) {
+ // Nothing else to do here.
+ return;
+ } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
+ if (file_decryption_properties != nullptr) {
+ if (!file_decryption_properties->plaintext_files_allowed()) {
+ throw ParquetException("Applying decryption properties on plaintext file");
+ }
+ }
+ } else {
+ // Encrypted file with plaintext footer mode.
+ ParseMetaDataOfEncryptedFileWithPlaintextFooter(
+ file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
+ }
+ }
+
+ // Validate the source size and get the initial read size.
+ int64_t GetFooterReadSize() {
if (source_size_ == 0) {
throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes");
} else if (source_size_ < kFooterSize) {
@@ -283,12 +352,12 @@ class SerializedFile : public ParquetFileReader::Contents {
"Parquet file size is ", source_size_,
" bytes, smaller than the minimum file footer (", kFooterSize, " bytes)");
}
+ return std::min(source_size_, kDefaultFooterReadSize);
+ }
- int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize);
- PARQUET_ASSIGN_OR_THROW(
- auto footer_buffer,
- source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
-
+ // Validate the magic bytes and get the length of the full footer.
+ uint32_t ParseFooterLength(const std::shared_ptr<::arrow::Buffer>& footer_buffer,
+ const int64_t footer_read_size) {
// Check if all bytes are read. Check if last 4 bytes read have the magic bits
if (footer_buffer->size() != footer_read_size ||
(memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 &&
@@ -297,21 +366,94 @@ class SerializedFile : public ParquetFileReader::Contents {
"Parquet magic bytes not found in footer. Either the file is corrupted or this "
"is not a parquet file.");
}
+ // Both encrypted/unencrypted footers have the same footer length check.
+ uint32_t metadata_len = ::arrow::util::SafeLoadAs(
+ reinterpret_cast(footer_buffer->data()) + footer_read_size -
+ kFooterSize);
+ if (metadata_len > source_size_ - kFooterSize) {
+ throw ParquetInvalidOrCorruptedFileException(
+ "Parquet file size is ", source_size_,
+ " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)");
+ }
+ return metadata_len;
+ }
+
+ // Does not throw.
+ ::arrow::Future<> ParseMetaDataAsync() {
+ int64_t footer_read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ footer_read_size = GetFooterReadSize();
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Assumes this is kept alive externally
+ return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+ -> ::arrow::Future<> {
+ uint32_t metadata_len;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
+ END_PARQUET_CATCH_EXCEPTIONS
+ int64_t metadata_start = source_size_ - kFooterSize - metadata_len;
+
+ std::shared_ptr<::arrow::Buffer> metadata_buffer;
+ if (footer_read_size >= (metadata_len + kFooterSize)) {
+ metadata_buffer =
+ SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize,
+ metadata_len);
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
+ std::move(metadata_buffer),
+ footer_read_size, metadata_len);
+ }
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+ return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer,
+ footer_read_size, metadata_len);
+ });
+ });
+ }
- if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
+ // Continuation
+ ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
+ std::shared_ptr<::arrow::Buffer> footer_buffer,
+ std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size,
+ uint32_t metadata_len) {
+ // Parse the footer depending on encryption type
+ const bool is_encrypted_footer =
+ memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0;
+ if (is_encrypted_footer) {
// Encrypted file with Encrypted footer.
- ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
- return;
+ std::pair read_size;
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ read_size =
+ ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len);
+ END_PARQUET_CATCH_EXCEPTIONS
+ // Read the actual footer
+ int64_t metadata_start = read_size.first;
+ metadata_len = read_size.second;
+ return source_->ReadAsync(metadata_start, metadata_len)
+ .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
+ // Continue and read the file footer
+ return ParseMetaDataAsync(std::move(footer_buffer), metadata_buffer,
+ footer_read_size, metadata_len,
+ is_encrypted_footer);
+ });
}
+ return ParseMetaDataAsync(std::move(footer_buffer), std::move(metadata_buffer),
+ footer_read_size, metadata_len, is_encrypted_footer);
+ }
- // No encryption or encryption with plaintext footer mode.
- std::shared_ptr metadata_buffer;
- uint32_t metadata_len, read_metadata_len;
- ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
- &metadata_len, &read_metadata_len);
-
+ // Continuation
+ ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer> footer_buffer,
+ std::shared_ptr<::arrow::Buffer> metadata_buffer,
+ int64_t footer_read_size, uint32_t metadata_len,
+ const bool is_encrypted_footer) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ const uint32_t read_metadata_len =
+ ParseUnencryptedFileMetadata(metadata_buffer, metadata_len);
auto file_decryption_properties = properties_.file_decryption_properties().get();
- if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
+ if (is_encrypted_footer) {
+ // Nothing else to do here.
+ return ::arrow::Status::OK();
+ } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
if (file_decryption_properties != nullptr) {
if (!file_decryption_properties->plaintext_files_allowed()) {
throw ParquetException("Applying decryption properties on plaintext file");
@@ -322,6 +464,8 @@ class SerializedFile : public ParquetFileReader::Contents {
ParseMetaDataOfEncryptedFileWithPlaintextFooter(
file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
}
+ END_PARQUET_CATCH_EXCEPTIONS
+ return ::arrow::Status::OK();
}
private:
@@ -333,10 +477,9 @@ class SerializedFile : public ParquetFileReader::Contents {
std::shared_ptr file_decryptor_;
- void ParseUnencryptedFileMetadata(const std::shared_ptr& footer_buffer,
- int64_t footer_read_size,
- std::shared_ptr* metadata_buffer,
- uint32_t* metadata_len, uint32_t* read_metadata_len);
+ // \return The true length of the metadata
+ uint32_t ParseUnencryptedFileMetadata(const std::shared_ptr& footer_buffer,
+ const uint32_t metadata_len);
std::string HandleAadPrefix(FileDecryptionProperties* file_decryption_properties,
EncryptionAlgorithm& algo);
@@ -346,68 +489,36 @@ class SerializedFile : public ParquetFileReader::Contents {
const std::shared_ptr& metadata_buffer, uint32_t metadata_len,
uint32_t read_metadata_len);
- void ParseMetaDataOfEncryptedFileWithEncryptedFooter(
- const std::shared_ptr& footer_buffer, int64_t footer_read_size);
+ // \return The position and size of the actual footer
+ std::pair ParseMetaDataOfEncryptedFileWithEncryptedFooter(
+ const std::shared_ptr& crypto_metadata_buffer, uint32_t footer_len);
};
-void SerializedFile::ParseUnencryptedFileMetadata(
- const std::shared_ptr& footer_buffer, int64_t footer_read_size,
- std::shared_ptr* metadata_buffer, uint32_t* metadata_len,
- uint32_t* read_metadata_len) {
- *metadata_len = ::arrow::util::SafeLoadAs(
- reinterpret_cast(footer_buffer->data()) + footer_read_size -
- kFooterSize);
- int64_t metadata_start = source_size_ - kFooterSize - *metadata_len;
- if (*metadata_len > source_size_ - kFooterSize) {
- throw ParquetInvalidOrCorruptedFileException(
- "Parquet file size is ", source_size_,
- " bytes, smaller than the size reported by metadata (", metadata_len, "bytes)");
- }
-
- // Check if the footer_buffer contains the entire metadata
- if (footer_read_size >= (*metadata_len + kFooterSize)) {
- *metadata_buffer = SliceBuffer(
- footer_buffer, footer_read_size - *metadata_len - kFooterSize, *metadata_len);
- } else {
- PARQUET_ASSIGN_OR_THROW(*metadata_buffer,
- source_->ReadAt(metadata_start, *metadata_len));
- if ((*metadata_buffer)->size() != *metadata_len) {
- throw ParquetException("Failed reading metadata buffer (requested " +
- std::to_string(*metadata_len) + " bytes but got " +
- std::to_string((*metadata_buffer)->size()) + " bytes)");
- }
+uint32_t SerializedFile::ParseUnencryptedFileMetadata(
+ const std::shared_ptr& metadata_buffer, const uint32_t metadata_len) {
+ if (metadata_buffer->size() != metadata_len) {
+ throw ParquetException("Failed reading metadata buffer (requested " +
+ std::to_string(metadata_len) + " bytes but got " +
+ std::to_string(metadata_buffer->size()) + " bytes)");
}
-
- *read_metadata_len = *metadata_len;
- file_metadata_ = FileMetaData::Make((*metadata_buffer)->data(), read_metadata_len);
+ uint32_t read_metadata_len = metadata_len;
+ // The encrypted read path falls through to here, so pass in the decryptor
+ file_metadata_ =
+ FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_);
+ return read_metadata_len;
}
-void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
- const std::shared_ptr& footer_buffer, int64_t footer_read_size) {
+std::pair
+SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
+ const std::shared_ptr<::arrow::Buffer>& crypto_metadata_buffer,
+ // both metadata & crypto metadata length
+ const uint32_t footer_len) {
// encryption with encrypted footer
- // both metadata & crypto metadata length
- uint32_t footer_len = ::arrow::util::SafeLoadAs(
- reinterpret_cast(footer_buffer->data()) + footer_read_size -
- kFooterSize);
- int64_t crypto_metadata_start = source_size_ - kFooterSize - footer_len;
- if (kFooterSize + footer_len > source_size_) {
- throw ParquetInvalidOrCorruptedFileException(
- "Parquet file size is ", source_size_,
- " bytes, smaller than the size reported by footer's (", footer_len, "bytes)");
- }
- std::shared_ptr crypto_metadata_buffer;
// Check if the footer_buffer contains the entire metadata
- if (footer_read_size >= (footer_len + kFooterSize)) {
- crypto_metadata_buffer = SliceBuffer(
- footer_buffer, footer_read_size - footer_len - kFooterSize, footer_len);
- } else {
- PARQUET_ASSIGN_OR_THROW(crypto_metadata_buffer,
- source_->ReadAt(crypto_metadata_start, footer_len));
- if (crypto_metadata_buffer->size() != footer_len) {
- throw ParquetException("Failed reading encrypted metadata buffer (requested " +
- std::to_string(footer_len) + " bytes but got " +
- std::to_string(crypto_metadata_buffer->size()) + " bytes)");
- }
+ if (crypto_metadata_buffer->size() != footer_len) {
+ throw ParquetException("Failed reading encrypted metadata buffer (requested " +
+ std::to_string(footer_len) + " bytes but got " +
+ std::to_string(crypto_metadata_buffer->size()) + " bytes)");
}
auto file_decryption_properties = properties_.file_decryption_properties().get();
if (file_decryption_properties == nullptr) {
@@ -426,16 +537,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter(
int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len;
uint32_t metadata_len = footer_len - crypto_metadata_len;
- PARQUET_ASSIGN_OR_THROW(auto metadata_buffer,
- source_->ReadAt(metadata_offset, metadata_len));
- if (metadata_buffer->size() != metadata_len) {
- throw ParquetException("Failed reading metadata buffer (requested " +
- std::to_string(metadata_len) + " bytes but got " +
- std::to_string(metadata_buffer->size()) + " bytes)");
- }
-
- file_metadata_ =
- FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_);
+ return std::make_pair(metadata_offset, metadata_len);
}
void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
@@ -547,6 +649,33 @@ std::unique_ptr ParquetFileReader::Contents::Open(
return result;
}
+::arrow::Future>
+ParquetFileReader::Contents::OpenAsync(std::shared_ptr source,
+ const ReaderProperties& props,
+ std::shared_ptr metadata) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ std::unique_ptr result(
+ new SerializedFile(std::move(source), props));
+ SerializedFile* file = static_cast(result.get());
+ if (metadata == nullptr) {
+ // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
+ struct {
+ ::arrow::Result> operator()() {
+ return std::move(result);
+ }
+
+ std::unique_ptr result;
+ } Continuation;
+ Continuation.result = std::move(result);
+ return file->ParseMetaDataAsync().Then(std::move(Continuation));
+ } else {
+ file->set_metadata(std::move(metadata));
+ return ::arrow::Future>::MakeFinished(
+ std::move(result));
+ }
+ END_PARQUET_CATCH_EXCEPTIONS
+}
+
std::unique_ptr ParquetFileReader::Open(
std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
std::shared_ptr metadata) {
@@ -571,6 +700,28 @@ std::unique_ptr ParquetFileReader::OpenFile(
return Open(std::move(source), props, std::move(metadata));
}
+::arrow::Future> ParquetFileReader::OpenAsync(
+ std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
+ std::shared_ptr metadata) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata));
+ // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
+ auto completed = ::arrow::Future>::Make();
+ fut.AddCallback([fut, completed](
+ const ::arrow::Result>&
+ contents) mutable {
+ if (!contents.ok()) {
+ completed.MarkFinished(contents.status());
+ return;
+ }
+ std::unique_ptr result(new ParquetFileReader());
+ result->Open(fut.MoveResult().MoveValueUnsafe());
+ completed.MarkFinished(std::move(result));
+ });
+ return completed;
+ END_PARQUET_CATCH_EXCEPTIONS
+}
+
void ParquetFileReader::Open(std::unique_ptr contents) {
contents_ = std::move(contents);
}
@@ -595,14 +746,22 @@ std::shared_ptr ParquetFileReader::RowGroup(int i) {
return contents_->GetRowGroup(i);
}
-::arrow::Future<> ParquetFileReader::PreBuffer(const std::vector& row_groups,
- const std::vector& column_indices,
- const ::arrow::io::IOContext& ctx,
- const ::arrow::io::CacheOptions& options) {
+void ParquetFileReader::PreBuffer(const std::vector& row_groups,
+ const std::vector& column_indices,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options) {
+ // Access private methods here
+ SerializedFile* file =
+ ::arrow::internal::checked_cast(contents_.get());
+ file->PreBuffer(row_groups, column_indices, ctx, options);
+}
+
+::arrow::Future<> ParquetFileReader::WhenBuffered(
+ const std::vector& row_groups, const std::vector& column_indices) const {
// Access private methods here
SerializedFile* file =
::arrow::internal::checked_cast(contents_.get());
- return file->PreBuffer(row_groups, column_indices, ctx, options);
+ return file->WhenBuffered(row_groups, column_indices);
}
// ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index de8685c7b90..4bc7ec2353a 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -74,6 +74,11 @@ class PARQUET_EXPORT ParquetFileReader {
const ReaderProperties& props = default_reader_properties(),
std::shared_ptr metadata = NULLPTR);
+ static ::arrow::Future> OpenAsync(
+ std::shared_ptr<::arrow::io::RandomAccessFile> source,
+ const ReaderProperties& props = default_reader_properties(),
+ std::shared_ptr metadata = NULLPTR);
+
virtual ~Contents() = default;
// Perform any cleanup associated with the file contents
virtual void Close() = 0;
@@ -98,6 +103,13 @@ class PARQUET_EXPORT ParquetFileReader {
const ReaderProperties& props = default_reader_properties(),
std::shared_ptr metadata = NULLPTR);
+ // Asynchronously open a file reader from an Arrow file object.
+ // Does not throw - all errors are reported through the Future.
+ static ::arrow::Future> OpenAsync(
+ std::shared_ptr<::arrow::io::RandomAccessFile> source,
+ const ReaderProperties& props = default_reader_properties(),
+ std::shared_ptr metadata = NULLPTR);
+
void Open(std::unique_ptr contents);
void Close();
@@ -125,10 +137,21 @@ class PARQUET_EXPORT ParquetFileReader {
/// buffered in memory until either \a PreBuffer() is called again,
/// or the reader itself is destructed. Reading - and buffering -
/// only one row group at a time may be useful.
- ::arrow::Future<> PreBuffer(const std::vector& row_groups,
- const std::vector& column_indices,
- const ::arrow::io::IOContext& ctx,
- const ::arrow::io::CacheOptions& options);
+ ///
+ /// This method may throw.
+ void PreBuffer(const std::vector& row_groups,
+ const std::vector& column_indices,
+ const ::arrow::io::IOContext& ctx,
+ const ::arrow::io::CacheOptions& options);
+
+ /// Wait for the specified row groups and column indices to be pre-buffered.
+ ///
+ /// After the returned Future completes, reading the specified row
+ /// groups/columns will not block.
+ ///
+ /// PreBuffer must be called first. This method does not throw.
+ ::arrow::Future<> WhenBuffered(const std::vector& row_groups,
+ const std::vector& column_indices) const;
private:
// Holds a pointer to an instance of Contents implementation
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 13ddc78cf11..5018fff9531 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -613,7 +613,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
/// implementation for characteristics of different filesystems.
void set_cache_options(::arrow::io::CacheOptions options) { cache_options_ = options; }
- ::arrow::io::CacheOptions cache_options() const { return cache_options_; }
+ const ::arrow::io::CacheOptions& cache_options() const { return cache_options_; }
/// Set execution context for read coalescing.
void set_io_context(const ::arrow::io::IOContext& ctx) { io_context_ = ctx; }
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 321531bb8f1..2f585e9d94d 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -16,6 +16,7 @@
// under the License.
#include
+#include
#include
#include
#include
@@ -26,6 +27,7 @@
#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/file.h"
+#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
@@ -559,6 +561,74 @@ TEST(TestFileReader, BufferedReads) {
}
}
+std::unique_ptr OpenBuffer(const std::string& contents) {
+ auto buffer = ::arrow::Buffer::FromString(contents);
+ return ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+}
+
+::arrow::Future<> OpenBufferAsync(const std::string& contents) {
+ auto buffer = ::arrow::Buffer::FromString(contents);
+ return ::arrow::Future<>(
+ ParquetFileReader::OpenAsync(std::make_shared<::arrow::io::BufferReader>(buffer)));
+}
+
+// https://github.com/google/googletest/pull/2904 not available in our version of
+// gtest/gmock
+#define EXPECT_THROW_THAT(callable, ex_type, property) \
+ EXPECT_THROW( \
+ try { (callable)(); } catch (const ex_type& err) { \
+ EXPECT_THAT(err, property); \
+ throw; \
+ }, \
+ ex_type)
+
+TEST(TestFileReader, TestOpenErrors) {
+ EXPECT_THROW_THAT(
+ []() { OpenBuffer(""); }, ParquetInvalidOrCorruptedFileException,
+ ::testing::Property(&ParquetInvalidOrCorruptedFileException::what,
+ ::testing::HasSubstr("Parquet file size is 0 bytes")));
+ EXPECT_THROW_THAT(
+ []() { OpenBuffer("AAAAPAR0"); }, ParquetInvalidOrCorruptedFileException,
+ ::testing::Property(&ParquetInvalidOrCorruptedFileException::what,
+ ::testing::HasSubstr("Parquet magic bytes not found")));
+ EXPECT_THROW_THAT(
+ []() { OpenBuffer("APAR1"); }, ParquetInvalidOrCorruptedFileException,
+ ::testing::Property(
+ &ParquetInvalidOrCorruptedFileException::what,
+ ::testing::HasSubstr(
+ "Parquet file size is 5 bytes, smaller than the minimum file footer")));
+ EXPECT_THROW_THAT(
+ []() { OpenBuffer("\xFF\xFF\xFF\x0FPAR1"); },
+ ParquetInvalidOrCorruptedFileException,
+ ::testing::Property(&ParquetInvalidOrCorruptedFileException::what,
+ ::testing::HasSubstr("Parquet file size is 8 bytes, smaller "
+ "than the size reported by footer's")));
+ EXPECT_THROW_THAT(
+ []() { OpenBuffer(std::string("\x00\x00\x00\x00PAR1", 8)); }, ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read")));
+
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ IOError, ::testing::HasSubstr("Parquet file size is 0 bytes"), OpenBufferAsync(""));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ IOError, ::testing::HasSubstr("Parquet magic bytes not found"),
+ OpenBufferAsync("AAAAPAR0"));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ IOError,
+ ::testing::HasSubstr(
+ "Parquet file size is 5 bytes, smaller than the minimum file footer"),
+ OpenBufferAsync("APAR1"));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ IOError,
+ ::testing::HasSubstr(
+ "Parquet file size is 8 bytes, smaller than the size reported by footer's"),
+ OpenBufferAsync("\xFF\xFF\xFF\x0FPAR1"));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ IOError, ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"),
+ OpenBufferAsync(std::string("\x00\x00\x00\x00PAR1", 8)));
+}
+
#ifdef ARROW_WITH_LZ4
struct TestCodecParam {
std::string name;
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 356bf8ce9c7..333e2a7612b 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -917,10 +917,10 @@ cdef class Fragment(_Weakrefable):
"""Return the physical schema of this Fragment. This schema can be
different from the dataset read schema."""
cdef:
- shared_ptr[CSchema] c_schema
-
- c_schema = GetResultValue(self.fragment.ReadPhysicalSchema())
- return pyarrow_wrap_schema(c_schema)
+ CResult[shared_ptr[CSchema]] maybe_schema
+ with nogil:
+ maybe_schema = self.fragment.ReadPhysicalSchema()
+ return pyarrow_wrap_schema(GetResultValue(maybe_schema))
@property
def partition_expression(self):
From 0e7b97b1f0fde51ab3677a0a3bb0b6786d1a187f Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 23 Apr 2021 09:47:54 -0400
Subject: [PATCH 3/8] ARROW-11843: [C++][Dataset] Implement
ParquetFileFormat::ScanBatchesAsync
---
cpp/src/arrow/dataset/file_parquet.cc | 91 +++++++++++++++++++++++++++
cpp/src/arrow/dataset/file_parquet.h | 7 +++
cpp/src/arrow/util/async_generator.h | 8 +++
3 files changed, 106 insertions(+)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 86bea49c22e..462f4f08531 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -333,6 +333,46 @@ Result> ParquetFileFormat::GetReader
return std::move(arrow_reader);
}
+Future> ParquetFileFormat::GetReaderAsync(
+ const FileSource& source, const std::shared_ptr& options) const {
+ ARROW_ASSIGN_OR_RAISE(
+ auto parquet_scan_options,
+ GetFragmentScanOptions(kParquetTypeName, options.get(),
+ default_fragment_scan_options));
+ auto properties =
+ MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
+ ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+ // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
+ auto reader_fut =
+ parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties));
+ auto path = source.path();
+ auto self = checked_pointer_cast(shared_from_this());
+ return reader_fut.Then(
+ [=](const std::unique_ptr&) mutable
+ -> Result> {
+ ARROW_ASSIGN_OR_RAISE(std::unique_ptr reader,
+ reader_fut.MoveResult());
+ std::shared_ptr metadata = reader->metadata();
+ auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
+ arrow_properties.set_batch_size(options->batch_size);
+ // TODO: ARROW-12597 will let us enable parallel conversion
+ if (!options->use_threads) {
+ arrow_properties.set_use_threads(
+ parquet_scan_options->enable_parallel_column_conversion);
+ }
+ std::unique_ptr arrow_reader;
+ RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
+ std::move(arrow_properties),
+ &arrow_reader));
+ return std::move(arrow_reader);
+ },
+ [path](
+ const Status& status) -> Result> {
+ return status.WithMessage("Could not open Parquet input source '", path,
+ "': ", status.message());
+ });
+}
+
Result ParquetFileFormat::ScanFile(
const std::shared_ptr& options,
const std::shared_ptr& fragment) const {
@@ -390,6 +430,57 @@ Result ParquetFileFormat::ScanFile(
return MakeVectorIterator(std::move(tasks));
}
+Result ParquetFileFormat::ScanBatchesAsync(
+ const std::shared_ptr& options,
+ const std::shared_ptr& file) const {
+ auto parquet_fragment = checked_pointer_cast(file);
+ std::vector row_groups;
+ bool pre_filtered = false;
+ // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
+ // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
+ // prior statistics knowledge. In the case where a RowGroup doesn't have statistics
+ // metdata, it will not be excluded.
+ if (parquet_fragment->metadata() != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
+ pre_filtered = true;
+ if (row_groups.empty()) return MakeEmptyGenerator>();
+ }
+ // Open the reader and pay the real IO cost.
+ auto make_generator =
+ [=](const std::shared_ptr& reader) mutable
+ -> Result {
+ // Ensure that parquet_fragment has FileMetaData
+ RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
+ if (!pre_filtered) {
+ // row groups were not already filtered; do this now
+ ARROW_ASSIGN_OR_RAISE(row_groups,
+ parquet_fragment->FilterRowGroups(options->filter));
+ if (row_groups.empty()) return MakeEmptyGenerator>();
+ }
+ auto column_projection = InferColumnProjection(*reader, *options);
+ ARROW_ASSIGN_OR_RAISE(
+ auto parquet_scan_options,
+ GetFragmentScanOptions(
+ kParquetTypeName, options.get(), default_fragment_scan_options));
+ if (parquet_scan_options->arrow_reader_properties->pre_buffer()) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ // Ignore the future here - don't wait for pre-buffering (the generator itself will
+ // wait as necessary)
+ auto io_context = parquet_scan_options->arrow_reader_properties->io_context();
+ auto cache_options = parquet_scan_options->arrow_reader_properties->cache_options();
+ ARROW_UNUSED(reader->parquet_reader()->PreBuffer(row_groups, column_projection,
+ io_context, cache_options));
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+ ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
+ reader, row_groups, column_projection,
+ internal::GetCpuThreadPool()));
+ return MakeReadaheadGenerator(std::move(generator), options->batch_readahead);
+ };
+ return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
+ .Then(std::move(make_generator)));
+}
+
Future> ParquetFileFormat::CountRows(
const std::shared_ptr& file, compute::Expression predicate,
const std::shared_ptr& options) {
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index f6505ed6dd2..8286e2776cb 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -99,6 +99,10 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
const std::shared_ptr& options,
const std::shared_ptr& file) const override;
+ Result ScanBatchesAsync(
+ const std::shared_ptr& options,
+ const std::shared_ptr& file) const override;
+
Future> CountRows(
const std::shared_ptr& file, compute::Expression predicate,
const std::shared_ptr& options) override;
@@ -119,6 +123,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
Result> GetReader(
const FileSource& source, ScanOptions* = NULLPTR) const;
+ Future> GetReaderAsync(
+ const FileSource& source, const std::shared_ptr& options) const;
+
Result> MakeWriter(
std::shared_ptr destination, std::shared_ptr schema,
std::shared_ptr options) const override;
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index 4cd8a3a9c9d..d975792ea10 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -1522,6 +1522,14 @@ std::function()> MakeSingleFutureGenerator(Future future) {
};
}
+/// \brief Make a generator that immediately ends.
+///
+/// This generator is async-reentrant.
+template
+std::function()> MakeEmptyGenerator() {
+ return []() -> Future { return AsyncGeneratorEnd(); };
+}
+
/// \brief Make a generator that always fails with a given error
///
/// This generator is async-reentrant.
From 69b49a83d710feb2b10f29f2c5b4e8e2485bd5a6 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 1 Jun 2021 09:16:10 -0500
Subject: [PATCH 4/8] ARROW-11843: [C++] Address review feedback
---
.../parquet/arrow/arrow_reader_writer_test.cc | 3 ++
cpp/src/parquet/arrow/reader.cc | 30 +++++++++----------
cpp/src/parquet/arrow/reader.h | 3 +-
cpp/src/parquet/file_reader.cc | 15 ++++------
4 files changed, 25 insertions(+), 26 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 3f2cbee6ebd..677458ce37e 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -2370,6 +2370,9 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
ASSERT_EQ(batch3, nullptr);
check_batches(batch1, num_columns, row_group_size);
check_batches(batch2, num_columns, row_group_size);
+ ASSERT_OK_AND_ASSIGN(auto actual, ::arrow::Table::FromRecordBatches(
+ batch1->schema(), {batch1, batch2}));
+ AssertTablesEqual(*table, *actual, /*same_chunk_layout=*/false);
}
{
// No columns case
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 92f7aafe120..14eb7495805 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -326,7 +326,7 @@ class FileReaderImpl : public FileReader {
GetRecordBatchGenerator(std::shared_ptr reader,
const std::vector row_group_indices,
const std::vector column_indices,
- ::arrow::internal::Executor* executor) override;
+ ::arrow::internal::Executor* cpu_executor) override;
int num_columns() const { return reader_->metadata()->num_columns(); }
@@ -988,10 +988,10 @@ class RowGroupGenerator {
::arrow::AsyncGenerator>;
explicit RowGroupGenerator(std::shared_ptr arrow_reader,
- ::arrow::internal::Executor* executor,
+ ::arrow::internal::Executor* cpu_executor,
std::vector row_groups, std::vector column_indices)
: arrow_reader_(std::move(arrow_reader)),
- executor_(executor),
+ cpu_executor_(cpu_executor),
row_groups_(std::move(row_groups)),
column_indices_(std::move(column_indices)),
index_(0) {}
@@ -1004,10 +1004,11 @@ class RowGroupGenerator {
std::vector column_indices = column_indices_;
auto reader = arrow_reader_;
if (!reader->properties().pre_buffer()) {
- return SubmitRead(executor_, reader, row_group, column_indices);
+ return SubmitRead(cpu_executor_, reader, row_group, column_indices);
}
auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
- if (executor_) ready = executor_->Transfer(ready);
+ // TODO(ARROW-12916): always transfer here
+ if (cpu_executor_) ready = cpu_executor_->Transfer(ready);
return ready.Then([=]() -> ::arrow::Result {
return ReadOneRowGroup(reader, row_group, column_indices);
});
@@ -1020,15 +1021,15 @@ class RowGroupGenerator {
// generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for
// async I/O without forcing readahead.
static ::arrow::Future SubmitRead(
- ::arrow::internal::Executor* executor, std::shared_ptr self,
+ ::arrow::internal::Executor* cpu_executor, std::shared_ptr self,
const int row_group, const std::vector& column_indices) {
- if (!executor) {
+ if (!cpu_executor) {
return Future::MakeFinished(
ReadOneRowGroup(self, row_group, column_indices));
}
// If we have an executor, then force transfer (even if I/O was complete)
return ::arrow::DeferNotOk(
- executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+ cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
}
static ::arrow::Result ReadOneRowGroup(
@@ -1051,7 +1052,7 @@ class RowGroupGenerator {
}
std::shared_ptr arrow_reader_;
- ::arrow::internal::Executor* executor_;
+ ::arrow::internal::Executor* cpu_executor_;
std::vector row_groups_;
std::vector column_indices_;
size_t index_;
@@ -1061,7 +1062,7 @@ ::arrow::Result<::arrow::AsyncGenerator>>
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader,
const std::vector row_group_indices,
const std::vector column_indices,
- ::arrow::internal::Executor* executor) {
+ ::arrow::internal::Executor* cpu_executor) {
RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
@@ -1069,11 +1070,10 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader,
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}
- ::arrow::AsyncGenerator<::arrow::AsyncGenerator>>
- row_group_generator = RowGroupGenerator(
- ::arrow::internal::checked_pointer_cast(reader), executor,
- row_group_indices, column_indices);
- return ::arrow::MakeConcatenatedGenerator(row_group_generator);
+ ::arrow::AsyncGenerator row_group_generator =
+ RowGroupGenerator(::arrow::internal::checked_pointer_cast(reader),
+ cpu_executor, row_group_indices, column_indices);
+ return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator));
}
Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory,
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 45908300dd7..b2ca3e14ae4 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -22,7 +22,6 @@
#include
#include "arrow/util/async_generator.h"
-#include "arrow/util/optional.h"
#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
@@ -191,7 +190,7 @@ class PARQUET_EXPORT FileReader {
GetRecordBatchGenerator(std::shared_ptr reader,
const std::vector row_group_indices,
const std::vector column_indices,
- ::arrow::internal::Executor* executor = NULLPTR) = 0;
+ ::arrow::internal::Executor* cpu_executor = NULLPTR) = 0;
::arrow::Status GetRecordBatchReader(const std::vector& row_group_indices,
const std::vector& column_indices,
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 3a580f15427..9dbfca433ce 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -432,19 +432,16 @@ class SerializedFile : public ParquetFileReader::Contents {
return source_->ReadAsync(metadata_start, metadata_len)
.Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
// Continue and read the file footer
- return ParseMetaDataAsync(std::move(footer_buffer), metadata_buffer,
- footer_read_size, metadata_len,
- is_encrypted_footer);
+ return ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer);
});
}
- return ParseMetaDataAsync(std::move(footer_buffer), std::move(metadata_buffer),
- footer_read_size, metadata_len, is_encrypted_footer);
+ return ParseMetaDataFinal(std::move(metadata_buffer), metadata_len,
+ is_encrypted_footer);
}
// Continuation
- ::arrow::Status ParseMetaDataAsync(std::shared_ptr<::arrow::Buffer> footer_buffer,
- std::shared_ptr<::arrow::Buffer> metadata_buffer,
- int64_t footer_read_size, uint32_t metadata_len,
+ ::arrow::Status ParseMetaDataFinal(std::shared_ptr<::arrow::Buffer> metadata_buffer,
+ uint32_t metadata_len,
const bool is_encrypted_footer) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
const uint32_t read_metadata_len =
@@ -477,7 +474,7 @@ class SerializedFile : public ParquetFileReader::Contents {
std::shared_ptr file_decryptor_;
- // \return The true length of the metadata
+ // \return The true length of the metadata in bytes
uint32_t ParseUnencryptedFileMetadata(const std::shared_ptr& footer_buffer,
const uint32_t metadata_len);
From 70a2d996a85c805de12939dcb52bf22afb102654 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 1 Jun 2021 17:57:56 -0500
Subject: [PATCH 5/8] ARROW-11843: [C++] Add test of OpenAsync with encryption
---
cpp/src/parquet/arrow/reader.h | 6 ++++--
.../encryption/test_encryption_util.cc | 19 +++++++++++++++++--
.../parquet/encryption/test_encryption_util.h | 5 +++++
3 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index b2ca3e14ae4..2d6a5ef2c3e 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -18,10 +18,11 @@
#pragma once
#include
+// N.B. we don't include async_generator.h as it's relatively heavy
+#include
#include
#include
-#include "arrow/util/async_generator.h"
#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
@@ -186,7 +187,8 @@ class PARQUET_EXPORT FileReader {
///
/// \returns error Result if either row_group_indices or column_indices contains an
/// invalid index
- virtual ::arrow::Result<::arrow::AsyncGenerator>>
+ virtual ::arrow::Result<
+ std::function<::arrow::Future>()>>
GetRecordBatchGenerator(std::shared_ptr reader,
const std::vector row_group_indices,
const std::vector column_indices,
diff --git a/cpp/src/parquet/encryption/test_encryption_util.cc b/cpp/src/parquet/encryption/test_encryption_util.cc
index 217dc234ac7..8b83154c96c 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.cc
+++ b/cpp/src/parquet/encryption/test_encryption_util.cc
@@ -23,6 +23,7 @@
#include
+#include "arrow/testing/future_util.h"
#include "parquet/encryption/test_encryption_util.h"
#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
@@ -338,8 +339,24 @@ void FileDecryptor::DecryptFile(
std::shared_ptr<::arrow::io::RandomAccessFile> source;
PARQUET_ASSIGN_OR_THROW(
source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool()));
+
auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties);
+ CheckFile(file_reader.get(), file_decryption_properties.get());
+
+ if (file_decryption_properties) {
+ reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
+ }
+ auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties);
+ ASSERT_FINISHES_OK(fut);
+ ASSERT_OK_AND_ASSIGN(file_reader, fut.MoveResult());
+ CheckFile(file_reader.get(), file_decryption_properties.get());
+ file_reader->Close();
+ PARQUET_THROW_NOT_OK(source->Close());
+}
+
+void FileDecryptor::CheckFile(parquet::ParquetFileReader* file_reader,
+ FileDecryptionProperties* file_decryption_properties) {
// Get the File MetaData
std::shared_ptr file_metadata = file_reader->metadata();
@@ -478,8 +495,6 @@ void FileDecryptor::DecryptFile(
// make sure we got the same number of values the metadata says
ASSERT_EQ(flba_md->num_values(), i);
}
- file_reader->Close();
- PARQUET_THROW_NOT_OK(source->Close());
}
} // namespace test
diff --git a/cpp/src/parquet/encryption/test_encryption_util.h b/cpp/src/parquet/encryption/test_encryption_util.h
index 32790950f84..b5d71b9954f 100644
--- a/cpp/src/parquet/encryption/test_encryption_util.h
+++ b/cpp/src/parquet/encryption/test_encryption_util.h
@@ -33,6 +33,7 @@
#include "parquet/test_util.h"
namespace parquet {
+class ParquetFileReader;
namespace encryption {
namespace test {
@@ -106,6 +107,10 @@ class FileDecryptor {
public:
void DecryptFile(std::string file_name,
std::shared_ptr file_decryption_properties);
+
+ private:
+ void CheckFile(parquet::ParquetFileReader* file_reader,
+ FileDecryptionProperties* file_decryption_properties);
};
} // namespace test
From 92ea23b929491340150fbb5b82b7fcaf5be777af Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 3 Jun 2021 09:01:39 -0500
Subject: [PATCH 6/8] ARROW-11843: [C++] Defer pre-buffer handling to reader
---
cpp/src/arrow/dataset/file_parquet.cc | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 462f4f08531..8c325d21da1 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -355,6 +355,13 @@ Future> ParquetFileFormat::GetReader
std::shared_ptr metadata = reader->metadata();
auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
arrow_properties.set_batch_size(options->batch_size);
+ // Must be set here since the sync ScanTask handles pre-buffering itself
+ arrow_properties.set_pre_buffer(
+ parquet_scan_options->arrow_reader_properties->pre_buffer());
+ arrow_properties.set_cache_options(
+ parquet_scan_options->arrow_reader_properties->cache_options());
+ arrow_properties.set_io_context(
+ parquet_scan_options->arrow_reader_properties->io_context());
// TODO: ARROW-12597 will let us enable parallel conversion
if (!options->use_threads) {
arrow_properties.set_use_threads(
@@ -462,16 +469,6 @@ Result ParquetFileFormat::ScanBatchesAsync(
auto parquet_scan_options,
GetFragmentScanOptions(
kParquetTypeName, options.get(), default_fragment_scan_options));
- if (parquet_scan_options->arrow_reader_properties->pre_buffer()) {
- BEGIN_PARQUET_CATCH_EXCEPTIONS
- // Ignore the future here - don't wait for pre-buffering (the generator itself will
- // wait as necessary)
- auto io_context = parquet_scan_options->arrow_reader_properties->io_context();
- auto cache_options = parquet_scan_options->arrow_reader_properties->cache_options();
- ARROW_UNUSED(reader->parquet_reader()->PreBuffer(row_groups, column_projection,
- io_context, cache_options));
- END_PARQUET_CATCH_EXCEPTIONS
- }
ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
reader, row_groups, column_projection,
internal::GetCpuThreadPool()));
From 65c1fa0a1e77c0bc081ac55d102946dfd68f8230 Mon Sep 17 00:00:00 2001
From: David Li