diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 3c27bd2b00e..698e33ca7a5 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -347,11 +347,23 @@ Result> ParquetFileFormat::Inspect( Result> ParquetFileFormat::GetReader( const FileSource& source, const std::shared_ptr& options) const { - return GetReaderAsync(source, options).result(); + return GetReaderAsync(source, options, nullptr).result(); +} + +Result> ParquetFileFormat::GetReader( + const FileSource& source, const std::shared_ptr& options, + const std::shared_ptr& metadata) const { + return GetReaderAsync(source, options, metadata).result(); } Future> ParquetFileFormat::GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const { + return GetReaderAsync(source, options, nullptr); +} + +Future> ParquetFileFormat::GetReaderAsync( + const FileSource& source, const std::shared_ptr& options, + const std::shared_ptr& metadata) const { ARROW_ASSIGN_OR_RAISE( auto parquet_scan_options, GetFragmentScanOptions(kParquetTypeName, options.get(), @@ -360,8 +372,8 @@ Future> ParquetFileFormat::GetReader MakeReaderProperties(*this, parquet_scan_options.get(), options->pool); ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); // TODO(ARROW-12259): workaround since we have Future<(move-only type)> - auto reader_fut = - parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties)); + auto reader_fut = parquet::ParquetFileReader::OpenAsync( + std::move(input), std::move(properties), metadata); auto path = source.path(); auto self = checked_pointer_cast(shared_from_this()); return reader_fut.Then( @@ -443,7 +455,7 @@ Result ParquetFileFormat::ScanBatchesAsync( // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to // prior statistics knowledge. In the case where a RowGroup doesn't have statistics - // metdata, it will not be excluded. + // metadata, it will not be excluded. if (parquet_fragment->metadata() != nullptr) { ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); pre_filtered = true; @@ -483,8 +495,9 @@ Result ParquetFileFormat::ScanBatchesAsync( MakeSerialReadaheadGenerator(std::move(sliced), batch_readahead); return sliced_readahead; }; - auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) - .Then(std::move(make_generator))); + auto generator = MakeFromFuture( + GetReaderAsync(parquet_fragment->source(), options, parquet_fragment->metadata()) + .Then(std::move(make_generator))); WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); return generator; diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 1087fb9f9de..ac161234349 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -119,9 +119,17 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> GetReader( const FileSource& source, const std::shared_ptr& options) const; + Result> GetReader( + const FileSource& source, const std::shared_ptr& options, + const std::shared_ptr& metadata) const; + Future> GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const; + Future> GetReaderAsync( + const FileSource& source, const std::shared_ptr& options, + const std::shared_ptr& metadata) const; + Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, std::shared_ptr options, diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index ed495366ace..e414a844a2f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -25,6 +25,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" +#include "arrow/io/test_common.h" #include "arrow/io/util_internal.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -292,6 +293,51 @@ TEST_F(TestParquetFileFormat, CountRowsPredicatePushdown) { } } +TEST_F(TestParquetFileFormat, CachedMetadata) { + // Create a test file + auto mock_fs = std::make_shared(fs::kNoTime); + std::shared_ptr test_schema = schema({field("x", int32())}); + std::shared_ptr batch = RecordBatchFromJSON(test_schema, "[[0]]"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_stream, + mock_fs->OpenOutputStream("/foo.parquet")); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr writer, + format_->MakeWriter(out_stream, test_schema, format_->DefaultWriteOptions(), + {mock_fs, "/foo.parquet"})); + ASSERT_OK(writer->Write(batch)); + ASSERT_FINISHES_OK(writer->Finish()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr test_file, + mock_fs->OpenInputFile("/foo.parquet")); + std::shared_ptr tracked_input = + io::TrackedRandomAccessFile::Make(test_file.get()); + + FileSource source(tracked_input); + ASSERT_OK_AND_ASSIGN(auto fragment, + format_->MakeFragment(std::move(source), literal(true))); + + // Read the file the first time, will read metadata + auto options = std::make_shared(); + options->filter = literal(true); + ASSERT_OK_AND_ASSIGN(auto projection_descr, + ProjectionDescr::FromNames({"x"}, *test_schema)); + options->projected_schema = projection_descr.schema; + options->projection = projection_descr.expression; + ASSERT_OK_AND_ASSIGN(auto generator, fragment->ScanBatchesAsync(options)); + ASSERT_FINISHES_OK(CollectAsyncGenerator(std::move(generator))); + + ASSERT_GT(tracked_input->bytes_read(), 0); + int64_t bytes_read_first_time = tracked_input->bytes_read(); + + ASSERT_OK(tracked_input->Seek(0)); + + // Read the file the second time, should not read metadata + ASSERT_OK_AND_ASSIGN(generator, fragment->ScanBatchesAsync(options)); + ASSERT_FINISHES_OK(CollectAsyncGenerator(std::move(generator))); + int64_t bytes_read_second_time = tracked_input->bytes_read() - bytes_read_first_time; + ASSERT_LT(bytes_read_second_time, bytes_read_first_time); +} + TEST_F(TestParquetFileFormat, MultithreadedScan) { constexpr int64_t kNumRowGroups = 16; diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc index 7cd6e518414..5caa20a445e 100644 --- a/cpp/src/arrow/io/test_common.cc +++ b/cpp/src/arrow/io/test_common.cc @@ -20,6 +20,7 @@ #include #include #include // IWYU pragma: keep +#include #ifndef _WIN32 #include @@ -30,6 +31,7 @@ #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/future.h" #include "arrow/util/io_util.h" namespace arrow { @@ -108,5 +110,67 @@ Result> MemoryMapFixture::InitMemoryMap( void MemoryMapFixture::AppendFile(const std::string& path) { tmp_files_.push_back(path); } +class TrackedRandomAccessFileImpl : public TrackedRandomAccessFile { + public: + explicit TrackedRandomAccessFileImpl(io::RandomAccessFile* delegate) + : delegate_(delegate) {} + + Status Close() override { return delegate_->Close(); } + bool closed() const override { return delegate_->closed(); } + Result Tell() const override { return delegate_->Tell(); } + Status Seek(int64_t position) override { return delegate_->Seek(position); } + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); + SaveReadRange(position, nbytes); + return delegate_->Read(nbytes, out); + } + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); + SaveReadRange(position, nbytes); + return delegate_->Read(nbytes); + } + bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); } + Result GetSize() override { return delegate_->GetSize(); } + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAt(position, nbytes, out); + } + Result> ReadAt(int64_t position, int64_t nbytes) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAt(position, nbytes); + } + Future> ReadAsync(const io::IOContext& io_context, + int64_t position, int64_t nbytes) override { + SaveReadRange(position, nbytes); + return delegate_->ReadAsync(io_context, position, nbytes); + } + + int64_t num_reads() const override { return read_ranges_.size(); } + int64_t bytes_read() const override { + int64_t sum = 0; + for (const auto& range : read_ranges_) { + sum += range.length; + } + return sum; + } + + const std::vector& get_read_ranges() const override { + return read_ranges_; + } + + private: + io::RandomAccessFile* delegate_; + std::vector read_ranges_; + + void SaveReadRange(int64_t offset, int64_t length) { + read_ranges_.emplace_back(io::ReadRange{offset, length}); + } +}; + +std::unique_ptr TrackedRandomAccessFile::Make( + io::RandomAccessFile* target) { + return std::make_unique(target); +} + } // namespace io } // namespace arrow diff --git a/cpp/src/arrow/io/test_common.h b/cpp/src/arrow/io/test_common.h index 149ee987d7c..9abaef1a665 100644 --- a/cpp/src/arrow/io/test_common.h +++ b/cpp/src/arrow/io/test_common.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/io/interfaces.h" #include "arrow/testing/visibility.h" #include "arrow/type_fwd.h" @@ -54,5 +55,13 @@ class ARROW_TESTING_EXPORT MemoryMapFixture { std::vector tmp_files_; }; +class ARROW_TESTING_EXPORT TrackedRandomAccessFile : public io::RandomAccessFile { + public: + virtual int64_t num_reads() const = 0; + virtual int64_t bytes_read() const = 0; + virtual const std::vector& get_read_ranges() const = 0; + static std::unique_ptr Make(io::RandomAccessFile* target); +}; + } // namespace io } // namespace arrow diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index b556c8ed34b..77b6e2f0326 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1913,54 +1913,6 @@ TEST_F(TestFileFormatGeneratorCoalesced, Errors) { reader->GetRecordBatchGenerator(/*coalesce=*/true)); } -class TrackedRandomAccessFile : public io::RandomAccessFile { - public: - explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate) - : delegate_(delegate) {} - - Status Close() override { return delegate_->Close(); } - bool closed() const override { return delegate_->closed(); } - Result Tell() const override { return delegate_->Tell(); } - Status Seek(int64_t position) override { return delegate_->Seek(position); } - Result Read(int64_t nbytes, void* out) override { - ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); - SaveReadRange(position, nbytes); - return delegate_->Read(nbytes, out); - } - Result> Read(int64_t nbytes) override { - ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell()); - SaveReadRange(position, nbytes); - return delegate_->Read(nbytes); - } - bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); } - Result GetSize() override { return delegate_->GetSize(); } - Result ReadAt(int64_t position, int64_t nbytes, void* out) override { - SaveReadRange(position, nbytes); - return delegate_->ReadAt(position, nbytes, out); - } - Result> ReadAt(int64_t position, int64_t nbytes) override { - SaveReadRange(position, nbytes); - return delegate_->ReadAt(position, nbytes); - } - Future> ReadAsync(const io::IOContext& io_context, - int64_t position, int64_t nbytes) override { - SaveReadRange(position, nbytes); - return delegate_->ReadAsync(io_context, position, nbytes); - } - - int64_t num_reads() const { return read_ranges_.size(); } - - const std::vector& get_read_ranges() const { return read_ranges_; } - - private: - io::RandomAccessFile* delegate_; - std::vector read_ranges_; - - void SaveReadRange(int64_t offset, int64_t length) { - read_ranges_.emplace_back(io::ReadRange{offset, length}); - } -}; - TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) { // ARROW-6006 auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8())); @@ -2801,19 +2753,21 @@ void GetReadRecordBatchReadRanges( auto buffer = MakeBooleanInt32Int64File(num_rows, /*num_batches=*/1); io::BufferReader buffer_reader(buffer); - TrackedRandomAccessFile tracked(&buffer_reader); + std::unique_ptr tracked = + io::TrackedRandomAccessFile::Make(&buffer_reader); auto read_options = IpcReadOptions::Defaults(); // if empty, return all fields read_options.included_fields = included_fields; - ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(&tracked, read_options)); + ASSERT_OK_AND_ASSIGN(auto reader, + RecordBatchFileReader::Open(tracked.get(), read_options)); ASSERT_OK_AND_ASSIGN(auto out_batch, reader->ReadRecordBatch(0)); ASSERT_EQ(out_batch->num_rows(), num_rows); ASSERT_EQ(out_batch->num_columns(), included_fields.empty() ? 3 : included_fields.size()); - auto read_ranges = tracked.get_read_ranges(); + auto read_ranges = tracked->get_read_ranges(); // there are 3 read IOs before reading body: // 1) read magic and footer length IO @@ -2917,7 +2871,7 @@ class PreBufferingTest : public ::testing::TestWithParam { void OpenReader() { buffer_reader_ = std::make_shared(file_buffer_); - tracked_ = std::make_shared(buffer_reader_.get()); + tracked_ = io::TrackedRandomAccessFile::Make(buffer_reader_.get()); auto read_options = IpcReadOptions::Defaults(); if (ReadsArePlugged()) { // This will ensure that all reads get globbed together into one large read @@ -2994,7 +2948,7 @@ class PreBufferingTest : public ::testing::TestWithParam { std::vector> batches_; std::shared_ptr file_buffer_; std::shared_ptr buffer_reader_; - std::shared_ptr tracked_; + std::shared_ptr tracked_; std::shared_ptr reader_; };