Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,23 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(

Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReaderAsync(source, options).result();
return GetReaderAsync(source, options, nullptr).result();
}

Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
return GetReaderAsync(source, options, metadata).result();
}

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReaderAsync(source, options, nullptr);
}

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
Expand All @@ -360,8 +372,8 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut =
parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties));
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
Expand Down Expand Up @@ -443,7 +455,7 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
// If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
// a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
// prior statistics knowledge. In the case where a RowGroup doesn't have statistics
// metdata, it will not be excluded.
// metadata, it will not be excluded.
if (parquet_fragment->metadata() != nullptr) {
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
pre_filtered = true;
Expand Down Expand Up @@ -483,8 +495,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
MakeSerialReadaheadGenerator(std::move(sliced), batch_readahead);
return sliced_readahead;
};
auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
auto generator = MakeFromFuture(
GetReaderAsync(parquet_fragment->source(), options, parquet_fragment->metadata())
.Then(std::move(make_generator)));
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
return generator;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,17 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;

Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const;

Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;

Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const;

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options,
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/test_util.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/io/util_internal.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -292,6 +293,51 @@ TEST_F(TestParquetFileFormat, CountRowsPredicatePushdown) {
}
}

TEST_F(TestParquetFileFormat, CachedMetadata) {
// Create a test file
auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
std::shared_ptr<Schema> test_schema = schema({field("x", int32())});
std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(test_schema, "[[0]]");
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::OutputStream> out_stream,
mock_fs->OpenOutputStream("/foo.parquet"));
ASSERT_OK_AND_ASSIGN(
std::shared_ptr<FileWriter> writer,
format_->MakeWriter(out_stream, test_schema, format_->DefaultWriteOptions(),
{mock_fs, "/foo.parquet"}));
ASSERT_OK(writer->Write(batch));
ASSERT_FINISHES_OK(writer->Finish());

ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::RandomAccessFile> test_file,
mock_fs->OpenInputFile("/foo.parquet"));
std::shared_ptr<io::TrackedRandomAccessFile> tracked_input =
io::TrackedRandomAccessFile::Make(test_file.get());

FileSource source(tracked_input);
ASSERT_OK_AND_ASSIGN(auto fragment,
format_->MakeFragment(std::move(source), literal(true)));

// Read the file the first time, will read metadata
auto options = std::make_shared<ScanOptions>();
options->filter = literal(true);
ASSERT_OK_AND_ASSIGN(auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema));
options->projected_schema = projection_descr.schema;
options->projection = projection_descr.expression;
ASSERT_OK_AND_ASSIGN(auto generator, fragment->ScanBatchesAsync(options));
ASSERT_FINISHES_OK(CollectAsyncGenerator(std::move(generator)));

ASSERT_GT(tracked_input->bytes_read(), 0);
int64_t bytes_read_first_time = tracked_input->bytes_read();

ASSERT_OK(tracked_input->Seek(0));

// Read the file the second time, should not read metadata
ASSERT_OK_AND_ASSIGN(generator, fragment->ScanBatchesAsync(options));
ASSERT_FINISHES_OK(CollectAsyncGenerator(std::move(generator)));
int64_t bytes_read_second_time = tracked_input->bytes_read() - bytes_read_first_time;
ASSERT_LT(bytes_read_second_time, bytes_read_first_time);
}

TEST_F(TestParquetFileFormat, MultithreadedScan) {
constexpr int64_t kNumRowGroups = 16;

Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/io/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cstdint>
#include <fstream> // IWYU pragma: keep
#include <vector>

#ifndef _WIN32
#include <fcntl.h>
Expand All @@ -30,6 +31,7 @@
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"

namespace arrow {
Expand Down Expand Up @@ -108,5 +110,67 @@ Result<std::shared_ptr<MemoryMappedFile>> MemoryMapFixture::InitMemoryMap(

void MemoryMapFixture::AppendFile(const std::string& path) { tmp_files_.push_back(path); }

class TrackedRandomAccessFileImpl : public TrackedRandomAccessFile {
public:
explicit TrackedRandomAccessFileImpl(io::RandomAccessFile* delegate)
: delegate_(delegate) {}

Status Close() override { return delegate_->Close(); }
bool closed() const override { return delegate_->closed(); }
Result<int64_t> Tell() const override { return delegate_->Tell(); }
Status Seek(int64_t position) override { return delegate_->Seek(position); }
Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes);
}
bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); }
Result<int64_t> GetSize() override { return delegate_->GetSize(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes);
}
Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAsync(io_context, position, nbytes);
}

int64_t num_reads() const override { return read_ranges_.size(); }
int64_t bytes_read() const override {
int64_t sum = 0;
for (const auto& range : read_ranges_) {
sum += range.length;
}
return sum;
}

const std::vector<io::ReadRange>& get_read_ranges() const override {
return read_ranges_;
}

private:
io::RandomAccessFile* delegate_;
std::vector<io::ReadRange> read_ranges_;

void SaveReadRange(int64_t offset, int64_t length) {
read_ranges_.emplace_back(io::ReadRange{offset, length});
}
};

std::unique_ptr<TrackedRandomAccessFile> TrackedRandomAccessFile::Make(
io::RandomAccessFile* target) {
return std::make_unique<TrackedRandomAccessFileImpl>(target);
}

} // namespace io
} // namespace arrow
9 changes: 9 additions & 0 deletions cpp/src/arrow/io/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>

#include "arrow/io/interfaces.h"
#include "arrow/testing/visibility.h"
#include "arrow/type_fwd.h"

Expand Down Expand Up @@ -54,5 +55,13 @@ class ARROW_TESTING_EXPORT MemoryMapFixture {
std::vector<std::string> tmp_files_;
};

class ARROW_TESTING_EXPORT TrackedRandomAccessFile : public io::RandomAccessFile {
public:
virtual int64_t num_reads() const = 0;
virtual int64_t bytes_read() const = 0;
virtual const std::vector<io::ReadRange>& get_read_ranges() const = 0;
static std::unique_ptr<TrackedRandomAccessFile> Make(io::RandomAccessFile* target);
};

} // namespace io
} // namespace arrow
60 changes: 7 additions & 53 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1913,54 +1913,6 @@ TEST_F(TestFileFormatGeneratorCoalesced, Errors) {
reader->GetRecordBatchGenerator(/*coalesce=*/true));
}

class TrackedRandomAccessFile : public io::RandomAccessFile {
public:
explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
: delegate_(delegate) {}

Status Close() override { return delegate_->Close(); }
bool closed() const override { return delegate_->closed(); }
Result<int64_t> Tell() const override { return delegate_->Tell(); }
Status Seek(int64_t position) override { return delegate_->Seek(position); }
Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
SaveReadRange(position, nbytes);
return delegate_->Read(nbytes);
}
bool supports_zero_copy() const override { return delegate_->supports_zero_copy(); }
Result<int64_t> GetSize() override { return delegate_->GetSize(); }
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAt(position, nbytes);
}
Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
int64_t position, int64_t nbytes) override {
SaveReadRange(position, nbytes);
return delegate_->ReadAsync(io_context, position, nbytes);
}

int64_t num_reads() const { return read_ranges_.size(); }

const std::vector<io::ReadRange>& get_read_ranges() const { return read_ranges_; }

private:
io::RandomAccessFile* delegate_;
std::vector<io::ReadRange> read_ranges_;

void SaveReadRange(int64_t offset, int64_t length) {
read_ranges_.emplace_back(io::ReadRange{offset, length});
}
};

TEST(TestRecordBatchStreamReader, EmptyStreamWithDictionaries) {
// ARROW-6006
auto f0 = arrow::field("f0", arrow::dictionary(arrow::int8(), arrow::utf8()));
Expand Down Expand Up @@ -2801,19 +2753,21 @@ void GetReadRecordBatchReadRanges(
auto buffer = MakeBooleanInt32Int64File(num_rows, /*num_batches=*/1);

io::BufferReader buffer_reader(buffer);
TrackedRandomAccessFile tracked(&buffer_reader);
std::unique_ptr<io::TrackedRandomAccessFile> tracked =
io::TrackedRandomAccessFile::Make(&buffer_reader);

auto read_options = IpcReadOptions::Defaults();
// if empty, return all fields
read_options.included_fields = included_fields;
ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchFileReader::Open(&tracked, read_options));
ASSERT_OK_AND_ASSIGN(auto reader,
RecordBatchFileReader::Open(tracked.get(), read_options));
ASSERT_OK_AND_ASSIGN(auto out_batch, reader->ReadRecordBatch(0));

ASSERT_EQ(out_batch->num_rows(), num_rows);
ASSERT_EQ(out_batch->num_columns(),
included_fields.empty() ? 3 : included_fields.size());

auto read_ranges = tracked.get_read_ranges();
auto read_ranges = tracked->get_read_ranges();

// there are 3 read IOs before reading body:
// 1) read magic and footer length IO
Expand Down Expand Up @@ -2917,7 +2871,7 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {

void OpenReader() {
buffer_reader_ = std::make_shared<io::BufferReader>(file_buffer_);
tracked_ = std::make_shared<TrackedRandomAccessFile>(buffer_reader_.get());
tracked_ = io::TrackedRandomAccessFile::Make(buffer_reader_.get());
auto read_options = IpcReadOptions::Defaults();
if (ReadsArePlugged()) {
// This will ensure that all reads get globbed together into one large read
Expand Down Expand Up @@ -2994,7 +2948,7 @@ class PreBufferingTest : public ::testing::TestWithParam<bool> {
std::vector<std::shared_ptr<RecordBatch>> batches_;
std::shared_ptr<Buffer> file_buffer_;
std::shared_ptr<io::BufferReader> buffer_reader_;
std::shared_ptr<TrackedRandomAccessFile> tracked_;
std::shared_ptr<io::TrackedRandomAccessFile> tracked_;
std::shared_ptr<RecordBatchFileReader> reader_;
};

Expand Down