diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 86bea49c22e..8c325d21da1 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -333,6 +333,53 @@ Result> ParquetFileFormat::GetReader return std::move(arrow_reader); } +Future> ParquetFileFormat::GetReaderAsync( + const FileSource& source, const std::shared_ptr& options) const { + ARROW_ASSIGN_OR_RAISE( + auto parquet_scan_options, + GetFragmentScanOptions(kParquetTypeName, options.get(), + default_fragment_scan_options)); + auto properties = + MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + // TODO(ARROW-12259): workaround since we have Future<(move-only type)> + auto reader_fut = + parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties)); + auto path = source.path(); + auto self = checked_pointer_cast(shared_from_this()); + return reader_fut.Then( + [=](const std::unique_ptr&) mutable + -> Result> { + ARROW_ASSIGN_OR_RAISE(std::unique_ptr reader, + reader_fut.MoveResult()); + std::shared_ptr metadata = reader->metadata(); + auto arrow_properties = MakeArrowReaderProperties(*self, *metadata); + arrow_properties.set_batch_size(options->batch_size); + // Must be set here since the sync ScanTask handles pre-buffering itself + arrow_properties.set_pre_buffer( + parquet_scan_options->arrow_reader_properties->pre_buffer()); + arrow_properties.set_cache_options( + parquet_scan_options->arrow_reader_properties->cache_options()); + arrow_properties.set_io_context( + parquet_scan_options->arrow_reader_properties->io_context()); + // TODO: ARROW-12597 will let us enable parallel conversion + if (!options->use_threads) { + arrow_properties.set_use_threads( + parquet_scan_options->enable_parallel_column_conversion); + } + std::unique_ptr arrow_reader; + RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), + std::move(arrow_properties), + &arrow_reader)); + return std::move(arrow_reader); + }, + [path]( + const Status& status) -> Result> { + return status.WithMessage("Could not open Parquet input source '", path, + "': ", status.message()); + }); +} + Result ParquetFileFormat::ScanFile( const std::shared_ptr& options, const std::shared_ptr& fragment) const { @@ -390,6 +437,47 @@ Result ParquetFileFormat::ScanFile( return MakeVectorIterator(std::move(tasks)); } +Result ParquetFileFormat::ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& file) const { + auto parquet_fragment = checked_pointer_cast(file); + std::vector row_groups; + bool pre_filtered = false; + // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening + // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to + // prior statistics knowledge. In the case where a RowGroup doesn't have statistics + // metdata, it will not be excluded. + if (parquet_fragment->metadata() != nullptr) { + ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); + pre_filtered = true; + if (row_groups.empty()) return MakeEmptyGenerator>(); + } + // Open the reader and pay the real IO cost. + auto make_generator = + [=](const std::shared_ptr& reader) mutable + -> Result { + // Ensure that parquet_fragment has FileMetaData + RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get())); + if (!pre_filtered) { + // row groups were not already filtered; do this now + ARROW_ASSIGN_OR_RAISE(row_groups, + parquet_fragment->FilterRowGroups(options->filter)); + if (row_groups.empty()) return MakeEmptyGenerator>(); + } + auto column_projection = InferColumnProjection(*reader, *options); + ARROW_ASSIGN_OR_RAISE( + auto parquet_scan_options, + GetFragmentScanOptions( + kParquetTypeName, options.get(), default_fragment_scan_options)); + ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator( + reader, row_groups, column_projection, + internal::GetCpuThreadPool())); + return MakeReadaheadGenerator(std::move(generator), options->batch_readahead); + }; + return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) + .Then(std::move(make_generator))); +} + Future> ParquetFileFormat::CountRows( const std::shared_ptr& file, compute::Expression predicate, const std::shared_ptr& options) { diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index f6505ed6dd2..8286e2776cb 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -99,6 +99,10 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { const std::shared_ptr& options, const std::shared_ptr& file) const override; + Result ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& file) const override; + Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, const std::shared_ptr& options) override; @@ -119,6 +123,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> GetReader( const FileSource& source, ScanOptions* = NULLPTR) const; + Future> GetReaderAsync( + const FileSource& source, const std::shared_ptr& options) const; + Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options) const override; diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h index 190e5839bbf..878840587ff 100644 --- a/cpp/src/arrow/testing/future_util.h +++ b/cpp/src/arrow/testing/future_util.h @@ -52,6 +52,13 @@ ASSERT_RAISES(ENUM, _fut.status()); \ } while (false) +#define EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, expr) \ + do { \ + auto&& fut = (expr); \ + ASSERT_FINISHES_IMPL(fut); \ + EXPECT_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, fut.status()); \ + } while (false) + #define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, _future_name) \ auto _future_name = (rexpr); \ ASSERT_FINISHES_IMPL(_future_name); \ diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 4cd8a3a9c9d..d975792ea10 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -1522,6 +1522,14 @@ std::function()> MakeSingleFutureGenerator(Future future) { }; } +/// \brief Make a generator that immediately ends. +/// +/// This generator is async-reentrant. +template +std::function()> MakeEmptyGenerator() { + return []() -> Future { return AsyncGeneratorEnd(); }; +} + /// \brief Make a generator that always fails with a given error /// /// This generator is async-reentrant. diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 132443176ed..cd90d943fcd 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -829,6 +829,9 @@ Future>> All(std::vector> futures) { return out; } +template <> +inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {} + /// \brief Create a Future which completes when all of `futures` complete. /// /// The future will be marked complete if all `futures` complete diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 303fb454880..677458ce37e 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2287,10 +2287,9 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) { ASSERT_OK(builder.Open(std::make_shared(buffer))); ASSERT_OK(builder.properties(properties)->Build(&reader)); // Pre-buffer data and wait for I/O to complete. - ASSERT_OK(reader->parquet_reader() - ->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(), - ::arrow::io::CacheOptions::Defaults()) - .status()); + reader->parquet_reader()->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(), + ::arrow::io::CacheOptions::Defaults()); + ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3, 4}).status()); std::shared_ptr<::arrow::RecordBatchReader> rb_reader; ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}, &rb_reader)); @@ -2331,6 +2330,66 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) { ASSERT_EQ(actual_batch->num_rows(), num_rows); } +TEST(TestArrowReadWrite, GetRecordBatchGenerator) { + ArrowReaderProperties properties = default_arrow_reader_properties(); + const int num_rows = 1024; + const int row_group_size = 512; + const int num_columns = 2; + + std::shared_ptr table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); + + std::shared_ptr buffer; + ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size, + default_arrow_writer_properties(), &buffer)); + + std::shared_ptr reader; + { + std::unique_ptr unique_reader; + FileReaderBuilder builder; + ASSERT_OK(builder.Open(std::make_shared(buffer))); + ASSERT_OK(builder.properties(properties)->Build(&unique_reader)); + reader = std::move(unique_reader); + } + + auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch, + int num_columns, int num_rows) { + ASSERT_NE(batch, nullptr); + ASSERT_EQ(batch->num_columns(), num_columns); + ASSERT_EQ(batch->num_rows(), num_rows); + }; + { + ASSERT_OK_AND_ASSIGN(auto batch_generator, + reader->GetRecordBatchGenerator(reader, {0, 1}, {0, 1})); + auto fut1 = batch_generator(); + auto fut2 = batch_generator(); + auto fut3 = batch_generator(); + ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result()); + ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result()); + ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result()); + ASSERT_EQ(batch3, nullptr); + check_batches(batch1, num_columns, row_group_size); + check_batches(batch2, num_columns, row_group_size); + ASSERT_OK_AND_ASSIGN(auto actual, ::arrow::Table::FromRecordBatches( + batch1->schema(), {batch1, batch2})); + AssertTablesEqual(*table, *actual, /*same_chunk_layout=*/false); + } + { + // No columns case + ASSERT_OK_AND_ASSIGN(auto batch_generator, + reader->GetRecordBatchGenerator(reader, {0, 1}, {})); + auto fut1 = batch_generator(); + auto fut2 = batch_generator(); + auto fut3 = batch_generator(); + ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result()); + ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result()); + ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result()); + ASSERT_EQ(batch3, nullptr); + check_batches(batch1, 0, row_group_size); + check_batches(batch2, 0, row_group_size); + } +} + TEST(TestArrowReadWrite, ScanContents) { const int num_columns = 20; const int num_rows = 1000; @@ -2700,7 +2759,7 @@ TEST(ArrowReadWrite, Decimal256) { auto type = ::arrow::decimal256(8, 4); - const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", + const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", "-9999.9999", "9999.9999"])"; auto array = ::arrow::ArrayFromJSON(type, json); auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 016ceacb0ef..14eb7495805 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -30,7 +30,9 @@ #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" @@ -291,6 +293,11 @@ class FileReaderImpl : public FileReader { const std::vector& indices, std::shared_ptr
* table) override; + // Helper method used by ReadRowGroups/Generator - read the given row groups/columns, + // skipping bounds checks and pre-buffering. + Status DecodeRowGroups(const std::vector& row_groups, + const std::vector& indices, std::shared_ptr
* table); + Status ReadRowGroups(const std::vector& row_groups, std::shared_ptr
* table) override { return ReadRowGroups(row_groups, Iota(reader_->metadata()->num_columns()), table); @@ -315,6 +322,12 @@ class FileReaderImpl : public FileReader { Iota(reader_->metadata()->num_columns()), out); } + ::arrow::Result<::arrow::AsyncGenerator>> + GetRecordBatchGenerator(std::shared_ptr reader, + const std::vector row_group_indices, + const std::vector column_indices, + ::arrow::internal::Executor* cpu_executor) override; + int num_columns() const { return reader_->metadata()->num_columns(); } ParquetFileReader* parquet_reader() const override { return reader_.get(); } @@ -890,9 +903,8 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, if (reader_properties_.pre_buffer()) { // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled BEGIN_PARQUET_CATCH_EXCEPTIONS - ARROW_UNUSED(reader_->PreBuffer(row_groups, column_indices, - reader_properties_.io_context(), - reader_properties_.cache_options())); + reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); END_PARQUET_CATCH_EXCEPTIONS } @@ -968,6 +980,102 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, return Status::OK(); } +/// Given a file reader and a list of row groups, this is a generator of record +/// batch generators (where each sub-generator is the contents of a single row group). +class RowGroupGenerator { + public: + using RecordBatchGenerator = + ::arrow::AsyncGenerator>; + + explicit RowGroupGenerator(std::shared_ptr arrow_reader, + ::arrow::internal::Executor* cpu_executor, + std::vector row_groups, std::vector column_indices) + : arrow_reader_(std::move(arrow_reader)), + cpu_executor_(cpu_executor), + row_groups_(std::move(row_groups)), + column_indices_(std::move(column_indices)), + index_(0) {} + + ::arrow::Future operator()() { + if (index_ >= row_groups_.size()) { + return ::arrow::AsyncGeneratorEnd(); + } + int row_group = row_groups_[index_++]; + std::vector column_indices = column_indices_; + auto reader = arrow_reader_; + if (!reader->properties().pre_buffer()) { + return SubmitRead(cpu_executor_, reader, row_group, column_indices); + } + auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); + // TODO(ARROW-12916): always transfer here + if (cpu_executor_) ready = cpu_executor_->Transfer(ready); + return ready.Then([=]() -> ::arrow::Result { + return ReadOneRowGroup(reader, row_group, column_indices); + }); + } + + private: + // Synchronous fallback for when pre-buffer isn't enabled. + // + // Making the Parquet reader truly asynchronous requires heavy refactoring, so the + // generator piggybacks on ReadRangeCache. The lazy ReadRangeCache can be used for + // async I/O without forcing readahead. + static ::arrow::Future SubmitRead( + ::arrow::internal::Executor* cpu_executor, std::shared_ptr self, + const int row_group, const std::vector& column_indices) { + if (!cpu_executor) { + return Future::MakeFinished( + ReadOneRowGroup(self, row_group, column_indices)); + } + // If we have an executor, then force transfer (even if I/O was complete) + return ::arrow::DeferNotOk( + cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices)); + } + + static ::arrow::Result ReadOneRowGroup( + std::shared_ptr self, const int row_group, + const std::vector& column_indices) { + std::shared_ptr<::arrow::Table> table; + // Skips bound checks/pre-buffering, since we've done that already + RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table)); + auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table); + ::arrow::RecordBatchVector batches; + while (true) { + std::shared_ptr<::arrow::RecordBatch> batch; + RETURN_NOT_OK(table_reader->ReadNext(&batch)); + if (!batch) { + break; + } + batches.push_back(batch); + } + return ::arrow::MakeVectorGenerator(std::move(batches)); + } + + std::shared_ptr arrow_reader_; + ::arrow::internal::Executor* cpu_executor_; + std::vector row_groups_; + std::vector column_indices_; + size_t index_; +}; + +::arrow::Result<::arrow::AsyncGenerator>> +FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, + const std::vector row_group_indices, + const std::vector column_indices, + ::arrow::internal::Executor* cpu_executor) { + RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices)); + if (reader_properties_.pre_buffer()) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + reader_->PreBuffer(row_group_indices, column_indices, reader_properties_.io_context(), + reader_properties_.cache_options()); + END_PARQUET_CATCH_EXCEPTIONS + } + ::arrow::AsyncGenerator row_group_generator = + RowGroupGenerator(::arrow::internal::checked_pointer_cast(reader), + cpu_executor, row_group_indices, column_indices); + return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); +} + Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, std::unique_ptr* out) { RETURN_NOT_OK(BoundsCheckColumn(i)); @@ -990,12 +1098,19 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS - ARROW_UNUSED(parquet_reader()->PreBuffer(row_groups, column_indices, - reader_properties_.io_context(), - reader_properties_.cache_options())); + parquet_reader()->PreBuffer(row_groups, column_indices, + reader_properties_.io_context(), + reader_properties_.cache_options()); END_PARQUET_CATCH_EXCEPTIONS } + return DecodeRowGroups(row_groups, column_indices, out); +} + +// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice +Status FileReaderImpl::DecodeRowGroups(const std::vector& row_groups, + const std::vector& column_indices, + std::shared_ptr
* out) { std::vector> readers; std::shared_ptr<::arrow::Schema> result_schema; RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema)); diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 765e2f6d39a..2d6a5ef2c3e 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -18,6 +18,8 @@ #pragma once #include +// N.B. we don't include async_generator.h as it's relatively heavy +#include #include #include @@ -178,6 +180,20 @@ class PARQUET_EXPORT FileReader { const std::vector& row_group_indices, const std::vector& column_indices, std::unique_ptr<::arrow::RecordBatchReader>* out) = 0; + /// \brief Return a generator of record batches. + /// + /// The FileReader must outlive the generator, so this requires that you pass in a + /// shared_ptr. + /// + /// \returns error Result if either row_group_indices or column_indices contains an + /// invalid index + virtual ::arrow::Result< + std::function<::arrow::Future>()>> + GetRecordBatchGenerator(std::shared_ptr reader, + const std::vector row_group_indices, + const std::vector column_indices, + ::arrow::internal::Executor* cpu_executor = NULLPTR) = 0; + ::arrow::Status GetRecordBatchReader(const std::vector& row_group_indices, const std::vector& column_indices, std::shared_ptr<::arrow::RecordBatchReader>* out); diff --git a/cpp/src/parquet/arrow/reader_writer_benchmark.cc b/cpp/src/parquet/arrow/reader_writer_benchmark.cc index 6f5d195aad6..6445bb02758 100644 --- a/cpp/src/parquet/arrow/reader_writer_benchmark.cc +++ b/cpp/src/parquet/arrow/reader_writer_benchmark.cc @@ -33,7 +33,9 @@ #include "arrow/array/builder_primitive.h" #include "arrow/io/memory.h" #include "arrow/table.h" +#include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" +#include "arrow/util/async_generator.h" #include "arrow/util/bitmap_ops.h" #include "arrow/util/logging.h" @@ -534,6 +536,7 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) { EXIT_NOT_OK( WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10)); PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish()); + std::vector rgs{0, 2, 4, 6, 8}; while (state.KeepRunning()) { auto reader = @@ -541,16 +544,6 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) { std::unique_ptr arrow_reader; EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader), &arrow_reader)); - - std::vector> tables; - std::vector rgs; - for (int i = 0; i < arrow_reader->num_row_groups(); i++) { - // Only read the even numbered RowGroups - if ((i % 2) == 0) { - rgs.push_back(i); - } - } - std::shared_ptr<::arrow::Table> table; EXIT_NOT_OK(arrow_reader->ReadRowGroups(rgs, &table)); } @@ -559,6 +552,34 @@ static void BM_ReadMultipleRowGroups(::benchmark::State& state) { BENCHMARK(BM_ReadMultipleRowGroups); +static void BM_ReadMultipleRowGroupsGenerator(::benchmark::State& state) { + std::vector values(BENCHMARK_SIZE, 128); + std::shared_ptr<::arrow::Table> table = TableFromVector(values, true); + auto output = CreateOutputStream(); + // This writes 10 RowGroups + EXIT_NOT_OK( + WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10)); + PARQUET_ASSIGN_OR_THROW(auto buffer, output->Finish()); + std::vector rgs{0, 2, 4, 6, 8}; + + while (state.KeepRunning()) { + auto reader = + ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + std::unique_ptr unique_reader; + EXIT_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), std::move(reader), + &unique_reader)); + std::shared_ptr arrow_reader = std::move(unique_reader); + ASSIGN_OR_ABORT(auto generator, + arrow_reader->GetRecordBatchGenerator(arrow_reader, rgs, {0})); + auto fut = ::arrow::CollectAsyncGenerator(generator); + ASSIGN_OR_ABORT(auto batches, fut.result()); + ASSIGN_OR_ABORT(auto actual, ::arrow::Table::FromRecordBatches(std::move(batches))); + } + SetBytesProcessed(state); +} + +BENCHMARK(BM_ReadMultipleRowGroupsGenerator); + } // namespace benchmark } // namespace parquet diff --git a/cpp/src/parquet/encryption/test_encryption_util.cc b/cpp/src/parquet/encryption/test_encryption_util.cc index 8fe048e3bcd..8b83154c96c 100644 --- a/cpp/src/parquet/encryption/test_encryption_util.cc +++ b/cpp/src/parquet/encryption/test_encryption_util.cc @@ -23,6 +23,7 @@ #include +#include "arrow/testing/future_util.h" #include "parquet/encryption/test_encryption_util.h" #include "parquet/file_reader.h" #include "parquet/file_writer.h" @@ -284,6 +285,7 @@ void FileEncryptor::EncryptFile( // Close the ParquetFileWriter file_writer->Close(); + PARQUET_THROW_NOT_OK(out_file->Close()); return; } // namespace test @@ -334,8 +336,27 @@ void FileDecryptor::DecryptFile( reader_properties.file_decryption_properties(file_decryption_properties->DeepClone()); } - auto file_reader = parquet::ParquetFileReader::OpenFile(file, false, reader_properties); + std::shared_ptr<::arrow::io::RandomAccessFile> source; + PARQUET_ASSIGN_OR_THROW( + source, ::arrow::io::ReadableFile::Open(file, reader_properties.memory_pool())); + auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties); + CheckFile(file_reader.get(), file_decryption_properties.get()); + + if (file_decryption_properties) { + reader_properties.file_decryption_properties(file_decryption_properties->DeepClone()); + } + auto fut = parquet::ParquetFileReader::OpenAsync(source, reader_properties); + ASSERT_FINISHES_OK(fut); + ASSERT_OK_AND_ASSIGN(file_reader, fut.MoveResult()); + CheckFile(file_reader.get(), file_decryption_properties.get()); + + file_reader->Close(); + PARQUET_THROW_NOT_OK(source->Close()); +} + +void FileDecryptor::CheckFile(parquet::ParquetFileReader* file_reader, + FileDecryptionProperties* file_decryption_properties) { // Get the File MetaData std::shared_ptr file_metadata = file_reader->metadata(); @@ -474,7 +495,6 @@ void FileDecryptor::DecryptFile( // make sure we got the same number of values the metadata says ASSERT_EQ(flba_md->num_values(), i); } - file_reader->Close(); } } // namespace test diff --git a/cpp/src/parquet/encryption/test_encryption_util.h b/cpp/src/parquet/encryption/test_encryption_util.h index 32790950f84..b5d71b9954f 100644 --- a/cpp/src/parquet/encryption/test_encryption_util.h +++ b/cpp/src/parquet/encryption/test_encryption_util.h @@ -33,6 +33,7 @@ #include "parquet/test_util.h" namespace parquet { +class ParquetFileReader; namespace encryption { namespace test { @@ -106,6 +107,10 @@ class FileDecryptor { public: void DecryptFile(std::string file_name, std::shared_ptr file_decryption_properties); + + private: + void CheckFile(parquet::ParquetFileReader* file_reader, + FileDecryptionProperties* file_decryption_properties); }; } // namespace test diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 4ff214232e5..9dbfca433ce 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -258,10 +258,10 @@ class SerializedFile : public ParquetFileReader::Contents { file_metadata_ = std::move(metadata); } - ::arrow::Future<> PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { + void PreBuffer(const std::vector& row_groups, + const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options) { cached_source_ = std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); std::vector<::arrow::io::ReadRange> ranges; @@ -272,10 +272,79 @@ class SerializedFile : public ParquetFileReader::Contents { } } PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); - return cached_source_->Wait(); } + ::arrow::Future<> WhenBuffered(const std::vector& row_groups, + const std::vector& column_indices) const { + if (!cached_source_) { + return ::arrow::Status::Invalid("Must call PreBuffer before WhenBuffered"); + } + std::vector<::arrow::io::ReadRange> ranges; + for (int row : row_groups) { + for (int col : column_indices) { + ranges.push_back( + ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); + } + } + return cached_source_->WaitFor(ranges); + } + + // Metadata/footer parsing. Divided up to separate sync/async paths, and to use + // exceptions for error handling (with the async path converting to Future/Status). + void ParseMetaData() { + int64_t footer_read_size = GetFooterReadSize(); + PARQUET_ASSIGN_OR_THROW( + auto footer_buffer, + source_->ReadAt(source_size_ - footer_read_size, footer_read_size)); + uint32_t metadata_len = ParseFooterLength(footer_buffer, footer_read_size); + int64_t metadata_start = source_size_ - kFooterSize - metadata_len; + + std::shared_ptr<::arrow::Buffer> metadata_buffer; + if (footer_read_size >= (metadata_len + kFooterSize)) { + metadata_buffer = SliceBuffer( + footer_buffer, footer_read_size - metadata_len - kFooterSize, metadata_len); + } else { + PARQUET_ASSIGN_OR_THROW(metadata_buffer, + source_->ReadAt(metadata_start, metadata_len)); + } + + // Parse the footer depending on encryption type + const bool is_encrypted_footer = + memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; + if (is_encrypted_footer) { + // Encrypted file with Encrypted footer. + const std::pair read_size = + ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len); + // Read the actual footer + metadata_start = read_size.first; + metadata_len = read_size.second; + PARQUET_ASSIGN_OR_THROW(metadata_buffer, + source_->ReadAt(metadata_start, metadata_len)); + // Fall through + } + + const uint32_t read_metadata_len = + ParseUnencryptedFileMetadata(metadata_buffer, metadata_len); + auto file_decryption_properties = properties_.file_decryption_properties().get(); + if (is_encrypted_footer) { + // Nothing else to do here. + return; + } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. + if (file_decryption_properties != nullptr) { + if (!file_decryption_properties->plaintext_files_allowed()) { + throw ParquetException("Applying decryption properties on plaintext file"); + } + } + } else { + // Encrypted file with plaintext footer mode. + ParseMetaDataOfEncryptedFileWithPlaintextFooter( + file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); + } + } + + // Validate the source size and get the initial read size. + int64_t GetFooterReadSize() { if (source_size_ == 0) { throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes"); } else if (source_size_ < kFooterSize) { @@ -283,12 +352,12 @@ class SerializedFile : public ParquetFileReader::Contents { "Parquet file size is ", source_size_, " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)"); } + return std::min(source_size_, kDefaultFooterReadSize); + } - int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize); - PARQUET_ASSIGN_OR_THROW( - auto footer_buffer, - source_->ReadAt(source_size_ - footer_read_size, footer_read_size)); - + // Validate the magic bytes and get the length of the full footer. + uint32_t ParseFooterLength(const std::shared_ptr<::arrow::Buffer>& footer_buffer, + const int64_t footer_read_size) { // Check if all bytes are read. Check if last 4 bytes read have the magic bits if (footer_buffer->size() != footer_read_size || (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 && @@ -297,21 +366,91 @@ class SerializedFile : public ParquetFileReader::Contents { "Parquet magic bytes not found in footer. Either the file is corrupted or this " "is not a parquet file."); } + // Both encrypted/unencrypted footers have the same footer length check. + uint32_t metadata_len = ::arrow::util::SafeLoadAs( + reinterpret_cast(footer_buffer->data()) + footer_read_size - + kFooterSize); + if (metadata_len > source_size_ - kFooterSize) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet file size is ", source_size_, + " bytes, smaller than the size reported by footer's (", metadata_len, "bytes)"); + } + return metadata_len; + } + + // Does not throw. + ::arrow::Future<> ParseMetaDataAsync() { + int64_t footer_read_size; + BEGIN_PARQUET_CATCH_EXCEPTIONS + footer_read_size = GetFooterReadSize(); + END_PARQUET_CATCH_EXCEPTIONS + // Assumes this is kept alive externally + return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size) + .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer) + -> ::arrow::Future<> { + uint32_t metadata_len; + BEGIN_PARQUET_CATCH_EXCEPTIONS + metadata_len = ParseFooterLength(footer_buffer, footer_read_size); + END_PARQUET_CATCH_EXCEPTIONS + int64_t metadata_start = source_size_ - kFooterSize - metadata_len; + + std::shared_ptr<::arrow::Buffer> metadata_buffer; + if (footer_read_size >= (metadata_len + kFooterSize)) { + metadata_buffer = + SliceBuffer(footer_buffer, footer_read_size - metadata_len - kFooterSize, + metadata_len); + return ParseMaybeEncryptedMetaDataAsync(footer_buffer, + std::move(metadata_buffer), + footer_read_size, metadata_len); + } + return source_->ReadAsync(metadata_start, metadata_len) + .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { + return ParseMaybeEncryptedMetaDataAsync(footer_buffer, metadata_buffer, + footer_read_size, metadata_len); + }); + }); + } - if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) { + // Continuation + ::arrow::Future<> ParseMaybeEncryptedMetaDataAsync( + std::shared_ptr<::arrow::Buffer> footer_buffer, + std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size, + uint32_t metadata_len) { + // Parse the footer depending on encryption type + const bool is_encrypted_footer = + memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0; + if (is_encrypted_footer) { // Encrypted file with Encrypted footer. - ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size); - return; + std::pair read_size; + BEGIN_PARQUET_CATCH_EXCEPTIONS + read_size = + ParseMetaDataOfEncryptedFileWithEncryptedFooter(metadata_buffer, metadata_len); + END_PARQUET_CATCH_EXCEPTIONS + // Read the actual footer + int64_t metadata_start = read_size.first; + metadata_len = read_size.second; + return source_->ReadAsync(metadata_start, metadata_len) + .Then([=](const std::shared_ptr<::arrow::Buffer>& metadata_buffer) { + // Continue and read the file footer + return ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer); + }); } + return ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, + is_encrypted_footer); + } - // No encryption or encryption with plaintext footer mode. - std::shared_ptr metadata_buffer; - uint32_t metadata_len, read_metadata_len; - ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer, - &metadata_len, &read_metadata_len); - + // Continuation + ::arrow::Status ParseMetaDataFinal(std::shared_ptr<::arrow::Buffer> metadata_buffer, + uint32_t metadata_len, + const bool is_encrypted_footer) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + const uint32_t read_metadata_len = + ParseUnencryptedFileMetadata(metadata_buffer, metadata_len); auto file_decryption_properties = properties_.file_decryption_properties().get(); - if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. + if (is_encrypted_footer) { + // Nothing else to do here. + return ::arrow::Status::OK(); + } else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. if (file_decryption_properties != nullptr) { if (!file_decryption_properties->plaintext_files_allowed()) { throw ParquetException("Applying decryption properties on plaintext file"); @@ -322,6 +461,8 @@ class SerializedFile : public ParquetFileReader::Contents { ParseMetaDataOfEncryptedFileWithPlaintextFooter( file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); } + END_PARQUET_CATCH_EXCEPTIONS + return ::arrow::Status::OK(); } private: @@ -333,10 +474,9 @@ class SerializedFile : public ParquetFileReader::Contents { std::shared_ptr file_decryptor_; - void ParseUnencryptedFileMetadata(const std::shared_ptr& footer_buffer, - int64_t footer_read_size, - std::shared_ptr* metadata_buffer, - uint32_t* metadata_len, uint32_t* read_metadata_len); + // \return The true length of the metadata in bytes + uint32_t ParseUnencryptedFileMetadata(const std::shared_ptr& footer_buffer, + const uint32_t metadata_len); std::string HandleAadPrefix(FileDecryptionProperties* file_decryption_properties, EncryptionAlgorithm& algo); @@ -346,68 +486,36 @@ class SerializedFile : public ParquetFileReader::Contents { const std::shared_ptr& metadata_buffer, uint32_t metadata_len, uint32_t read_metadata_len); - void ParseMetaDataOfEncryptedFileWithEncryptedFooter( - const std::shared_ptr& footer_buffer, int64_t footer_read_size); + // \return The position and size of the actual footer + std::pair ParseMetaDataOfEncryptedFileWithEncryptedFooter( + const std::shared_ptr& crypto_metadata_buffer, uint32_t footer_len); }; -void SerializedFile::ParseUnencryptedFileMetadata( - const std::shared_ptr& footer_buffer, int64_t footer_read_size, - std::shared_ptr* metadata_buffer, uint32_t* metadata_len, - uint32_t* read_metadata_len) { - *metadata_len = ::arrow::util::SafeLoadAs( - reinterpret_cast(footer_buffer->data()) + footer_read_size - - kFooterSize); - int64_t metadata_start = source_size_ - kFooterSize - *metadata_len; - if (*metadata_len > source_size_ - kFooterSize) { - throw ParquetInvalidOrCorruptedFileException( - "Parquet file size is ", source_size_, - " bytes, smaller than the size reported by metadata (", metadata_len, "bytes)"); - } - - // Check if the footer_buffer contains the entire metadata - if (footer_read_size >= (*metadata_len + kFooterSize)) { - *metadata_buffer = SliceBuffer( - footer_buffer, footer_read_size - *metadata_len - kFooterSize, *metadata_len); - } else { - PARQUET_ASSIGN_OR_THROW(*metadata_buffer, - source_->ReadAt(metadata_start, *metadata_len)); - if ((*metadata_buffer)->size() != *metadata_len) { - throw ParquetException("Failed reading metadata buffer (requested " + - std::to_string(*metadata_len) + " bytes but got " + - std::to_string((*metadata_buffer)->size()) + " bytes)"); - } +uint32_t SerializedFile::ParseUnencryptedFileMetadata( + const std::shared_ptr& metadata_buffer, const uint32_t metadata_len) { + if (metadata_buffer->size() != metadata_len) { + throw ParquetException("Failed reading metadata buffer (requested " + + std::to_string(metadata_len) + " bytes but got " + + std::to_string(metadata_buffer->size()) + " bytes)"); } - - *read_metadata_len = *metadata_len; - file_metadata_ = FileMetaData::Make((*metadata_buffer)->data(), read_metadata_len); + uint32_t read_metadata_len = metadata_len; + // The encrypted read path falls through to here, so pass in the decryptor + file_metadata_ = + FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_); + return read_metadata_len; } -void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( - const std::shared_ptr& footer_buffer, int64_t footer_read_size) { +std::pair +SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( + const std::shared_ptr<::arrow::Buffer>& crypto_metadata_buffer, + // both metadata & crypto metadata length + const uint32_t footer_len) { // encryption with encrypted footer - // both metadata & crypto metadata length - uint32_t footer_len = ::arrow::util::SafeLoadAs( - reinterpret_cast(footer_buffer->data()) + footer_read_size - - kFooterSize); - int64_t crypto_metadata_start = source_size_ - kFooterSize - footer_len; - if (kFooterSize + footer_len > source_size_) { - throw ParquetInvalidOrCorruptedFileException( - "Parquet file size is ", source_size_, - " bytes, smaller than the size reported by footer's (", footer_len, "bytes)"); - } - std::shared_ptr crypto_metadata_buffer; // Check if the footer_buffer contains the entire metadata - if (footer_read_size >= (footer_len + kFooterSize)) { - crypto_metadata_buffer = SliceBuffer( - footer_buffer, footer_read_size - footer_len - kFooterSize, footer_len); - } else { - PARQUET_ASSIGN_OR_THROW(crypto_metadata_buffer, - source_->ReadAt(crypto_metadata_start, footer_len)); - if (crypto_metadata_buffer->size() != footer_len) { - throw ParquetException("Failed reading encrypted metadata buffer (requested " + - std::to_string(footer_len) + " bytes but got " + - std::to_string(crypto_metadata_buffer->size()) + " bytes)"); - } + if (crypto_metadata_buffer->size() != footer_len) { + throw ParquetException("Failed reading encrypted metadata buffer (requested " + + std::to_string(footer_len) + " bytes but got " + + std::to_string(crypto_metadata_buffer->size()) + " bytes)"); } auto file_decryption_properties = properties_.file_decryption_properties().get(); if (file_decryption_properties == nullptr) { @@ -426,16 +534,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len; uint32_t metadata_len = footer_len - crypto_metadata_len; - PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, - source_->ReadAt(metadata_offset, metadata_len)); - if (metadata_buffer->size() != metadata_len) { - throw ParquetException("Failed reading metadata buffer (requested " + - std::to_string(metadata_len) + " bytes but got " + - std::to_string(metadata_buffer->size()) + " bytes)"); - } - - file_metadata_ = - FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_); + return std::make_pair(metadata_offset, metadata_len); } void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter( @@ -547,6 +646,33 @@ std::unique_ptr ParquetFileReader::Contents::Open( return result; } +::arrow::Future> +ParquetFileReader::Contents::OpenAsync(std::shared_ptr source, + const ReaderProperties& props, + std::shared_ptr metadata) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + std::unique_ptr result( + new SerializedFile(std::move(source), props)); + SerializedFile* file = static_cast(result.get()); + if (metadata == nullptr) { + // TODO(ARROW-12259): workaround since we have Future<(move-only type)> + struct { + ::arrow::Result> operator()() { + return std::move(result); + } + + std::unique_ptr result; + } Continuation; + Continuation.result = std::move(result); + return file->ParseMetaDataAsync().Then(std::move(Continuation)); + } else { + file->set_metadata(std::move(metadata)); + return ::arrow::Future>::MakeFinished( + std::move(result)); + } + END_PARQUET_CATCH_EXCEPTIONS +} + std::unique_ptr ParquetFileReader::Open( std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props, std::shared_ptr metadata) { @@ -571,6 +697,28 @@ std::unique_ptr ParquetFileReader::OpenFile( return Open(std::move(source), props, std::move(metadata)); } +::arrow::Future> ParquetFileReader::OpenAsync( + std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props, + std::shared_ptr metadata) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata)); + // TODO(ARROW-12259): workaround since we have Future<(move-only type)> + auto completed = ::arrow::Future>::Make(); + fut.AddCallback([fut, completed]( + const ::arrow::Result>& + contents) mutable { + if (!contents.ok()) { + completed.MarkFinished(contents.status()); + return; + } + std::unique_ptr result(new ParquetFileReader()); + result->Open(fut.MoveResult().MoveValueUnsafe()); + completed.MarkFinished(std::move(result)); + }); + return completed; + END_PARQUET_CATCH_EXCEPTIONS +} + void ParquetFileReader::Open(std::unique_ptr contents) { contents_ = std::move(contents); } @@ -595,14 +743,22 @@ std::shared_ptr ParquetFileReader::RowGroup(int i) { return contents_->GetRowGroup(i); } -::arrow::Future<> ParquetFileReader::PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options) { +void ParquetFileReader::PreBuffer(const std::vector& row_groups, + const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options) { + // Access private methods here + SerializedFile* file = + ::arrow::internal::checked_cast(contents_.get()); + file->PreBuffer(row_groups, column_indices, ctx, options); +} + +::arrow::Future<> ParquetFileReader::WhenBuffered( + const std::vector& row_groups, const std::vector& column_indices) const { // Access private methods here SerializedFile* file = ::arrow::internal::checked_cast(contents_.get()); - return file->PreBuffer(row_groups, column_indices, ctx, options); + return file->WhenBuffered(row_groups, column_indices); } // ---------------------------------------------------------------------- diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index de8685c7b90..4bc7ec2353a 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -74,6 +74,11 @@ class PARQUET_EXPORT ParquetFileReader { const ReaderProperties& props = default_reader_properties(), std::shared_ptr metadata = NULLPTR); + static ::arrow::Future> OpenAsync( + std::shared_ptr<::arrow::io::RandomAccessFile> source, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr metadata = NULLPTR); + virtual ~Contents() = default; // Perform any cleanup associated with the file contents virtual void Close() = 0; @@ -98,6 +103,13 @@ class PARQUET_EXPORT ParquetFileReader { const ReaderProperties& props = default_reader_properties(), std::shared_ptr metadata = NULLPTR); + // Asynchronously open a file reader from an Arrow file object. + // Does not throw - all errors are reported through the Future. + static ::arrow::Future> OpenAsync( + std::shared_ptr<::arrow::io::RandomAccessFile> source, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr metadata = NULLPTR); + void Open(std::unique_ptr contents); void Close(); @@ -125,10 +137,21 @@ class PARQUET_EXPORT ParquetFileReader { /// buffered in memory until either \a PreBuffer() is called again, /// or the reader itself is destructed. Reading - and buffering - /// only one row group at a time may be useful. - ::arrow::Future<> PreBuffer(const std::vector& row_groups, - const std::vector& column_indices, - const ::arrow::io::IOContext& ctx, - const ::arrow::io::CacheOptions& options); + /// + /// This method may throw. + void PreBuffer(const std::vector& row_groups, + const std::vector& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options); + + /// Wait for the specified row groups and column indices to be pre-buffered. + /// + /// After the returned Future completes, reading the specified row + /// groups/columns will not block. + /// + /// PreBuffer must be called first. This method does not throw. + ::arrow::Future<> WhenBuffered(const std::vector& row_groups, + const std::vector& column_indices) const; private: // Holds a pointer to an instance of Contents implementation diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 13ddc78cf11..5018fff9531 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -613,7 +613,7 @@ class PARQUET_EXPORT ArrowReaderProperties { /// implementation for characteristics of different filesystems. void set_cache_options(::arrow::io::CacheOptions options) { cache_options_ = options; } - ::arrow::io::CacheOptions cache_options() const { return cache_options_; } + const ::arrow::io::CacheOptions& cache_options() const { return cache_options_; } /// Set execution context for read coalescing. void set_io_context(const ::arrow::io::IOContext& ctx) { io_context_ = ctx; } diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 321531bb8f1..9bbcda3cf1f 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -26,6 +27,7 @@ #include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/io/file.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" @@ -559,6 +561,76 @@ TEST(TestFileReader, BufferedReads) { } } +std::unique_ptr OpenBuffer(const std::string& contents) { + auto buffer = ::arrow::Buffer::FromString(contents); + return ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); +} + +::arrow::Future<> OpenBufferAsync(const std::string& contents) { + auto buffer = ::arrow::Buffer::FromString(contents); + return ::arrow::Future<>( + ParquetFileReader::OpenAsync(std::make_shared<::arrow::io::BufferReader>(buffer))); +} + +// https://github.com/google/googletest/pull/2904 not available in our version of +// gtest/gmock +#define EXPECT_THROW_THAT(callable, ex_type, property) \ + EXPECT_THROW( \ + try { (callable)(); } catch (const ex_type& err) { \ + EXPECT_THAT(err, (property)); \ + throw; \ + }, \ + ex_type) + +TEST(TestFileReader, TestOpenErrors) { + EXPECT_THROW_THAT( + []() { OpenBuffer(""); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet file size is 0 bytes"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("AAAAPAR0"); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet magic bytes not found"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("APAR1"); }, ParquetInvalidOrCorruptedFileException, + ::testing::Property( + &ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr( + "Parquet file size is 5 bytes, smaller than the minimum file footer"))); + EXPECT_THROW_THAT( + []() { OpenBuffer("\xFF\xFF\xFF\x0FPAR1"); }, + ParquetInvalidOrCorruptedFileException, + ::testing::Property(&ParquetInvalidOrCorruptedFileException::what, + ::testing::HasSubstr("Parquet file size is 8 bytes, smaller " + "than the size reported by footer's"))); + EXPECT_THROW_THAT( + []() { OpenBuffer(std::string("\x00\x00\x00\x00PAR1", 8)); }, ParquetException, + ::testing::Property( + &ParquetException::what, + ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"))); + + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, ::testing::HasSubstr("Parquet file size is 0 bytes"), OpenBufferAsync("")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, ::testing::HasSubstr("Parquet magic bytes not found"), + OpenBufferAsync("AAAAPAR0")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr( + "Parquet file size is 5 bytes, smaller than the minimum file footer"), + OpenBufferAsync("APAR1")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, + ::testing::HasSubstr( + "Parquet file size is 8 bytes, smaller than the size reported by footer's"), + OpenBufferAsync("\xFF\xFF\xFF\x0FPAR1")); + EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT( + IOError, ::testing::HasSubstr("Couldn't deserialize thrift: No more data to read"), + OpenBufferAsync(std::string("\x00\x00\x00\x00PAR1", 8))); +} + +#undef EXPECT_THROW_THAT + #ifdef ARROW_WITH_LZ4 struct TestCodecParam { std::string name; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 356bf8ce9c7..333e2a7612b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -917,10 +917,10 @@ cdef class Fragment(_Weakrefable): """Return the physical schema of this Fragment. This schema can be different from the dataset read schema.""" cdef: - shared_ptr[CSchema] c_schema - - c_schema = GetResultValue(self.fragment.ReadPhysicalSchema()) - return pyarrow_wrap_schema(c_schema) + CResult[shared_ptr[CSchema]] maybe_schema + with nogil: + maybe_schema = self.fragment.ReadPhysicalSchema() + return pyarrow_wrap_schema(GetResultValue(maybe_schema)) @property def partition_expression(self):