Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,53 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
return std::move(arrow_reader);
}

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut =
parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties));
auto path = source.path();
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit surprised this works at all honestly. Futures aren't really compatible with move-only types. I thought you would have gotten a compile error. This reminded me to create ARROW-12559. As long as this works I think you're ok. There is only one callback being added to reader_fut and you don't access the value that gets passed in as an arg here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it works since Future internally heap-allocates its implementation, and yes, this is rather iffy, but works since this is the only callback. I'll add a TODO referencing ARROW-12259.

reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
arrow_properties.set_batch_size(options->batch_size);
// Must be set here since the sync ScanTask handles pre-buffering itself
arrow_properties.set_pre_buffer(
parquet_scan_options->arrow_reader_properties->pre_buffer());
arrow_properties.set_cache_options(
parquet_scan_options->arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options->arrow_reader_properties->io_context());
// TODO: ARROW-12597 will let us enable parallel conversion
if (!options->use_threads) {
arrow_properties.set_use_threads(
parquet_scan_options->enable_parallel_column_conversion);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to look at enable_parallel_column_conversion anymore since we're async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While benchmarking I noticed it got stuck due to this, so there's still nested parallelism coming in somewhere - I'll try to figure this out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually, that makes sense. It would get stuck here: https://github.com/lidavidm/arrow/blob/parquet-reentrant/cpp/src/parquet/arrow/reader.cc#L959

ParallelFor does a blocking wait.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add ParallelForAsync which returns a Future but that can be done in a follow-up.

Copy link
Member

@westonpace westonpace Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a nice win for async in the HDD/SSD space (still recommending a follow-up PR).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this and added a TODO for ARROW-12597.

}
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return status.WithMessage("Could not open Parquet input source '", path,
"': ", status.message());
});
}

Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
Expand Down Expand Up @@ -390,6 +437,47 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
return MakeVectorIterator(std::move(tasks));
}

Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(file);
std::vector<int> row_groups;
bool pre_filtered = false;
// If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
// a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
// prior statistics knowledge. In the case where a RowGroup doesn't have statistics
// metdata, it will not be excluded.
if (parquet_fragment->metadata() != nullptr) {
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
pre_filtered = true;
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
// Open the reader and pay the real IO cost.
auto make_generator =
[=](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
-> Result<RecordBatchGenerator> {
// Ensure that parquet_fragment has FileMetaData
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
if (!pre_filtered) {
// row groups were not already filtered; do this now
ARROW_ASSIGN_OR_RAISE(row_groups,
parquet_fragment->FilterRowGroups(options->filter));
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
auto column_projection = InferColumnProjection(*reader, *options);
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, options.get(), default_fragment_scan_options));
ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
reader, row_groups, column_projection,
internal::GetCpuThreadPool()));
return MakeReadaheadGenerator(std::move(generator), options->batch_readahead);
};
return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
}

Future<util::optional<int64_t>> ParquetFileFormat::CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const override;

Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const override;

Future<util::optional<int64_t>> CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) override;
Expand All @@ -119,6 +123,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, ScanOptions* = NULLPTR) const;

Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override;
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/testing/future_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@
ASSERT_RAISES(ENUM, _fut.status()); \
} while (false)

#define EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, expr) \
do { \
auto&& fut = (expr); \
ASSERT_FINISHES_IMPL(fut); \
EXPECT_RAISES_WITH_MESSAGE_THAT(ENUM, matcher, fut.status()); \
} while (false)

#define ASSERT_FINISHES_OK_AND_ASSIGN_IMPL(lhs, rexpr, _future_name) \
auto _future_name = (rexpr); \
ASSERT_FINISHES_IMPL(_future_name); \
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,14 @@ std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
};
}

/// \brief Make a generator that immediately ends.
///
/// This generator is async-reentrant.
template <typename T>
std::function<Future<T>()> MakeEmptyGenerator() {
return []() -> Future<T> { return AsyncGeneratorEnd<T>(); };
}

/// \brief Make a generator that always fails with a given error
///
/// This generator is async-reentrant.
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,9 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
return out;
}

template <>
inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}

/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will be marked complete if all `futures` complete
Expand Down
69 changes: 64 additions & 5 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2287,10 +2287,9 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));
// Pre-buffer data and wait for I/O to complete.
ASSERT_OK(reader->parquet_reader()
->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(),
::arrow::io::CacheOptions::Defaults())
.status());
reader->parquet_reader()->PreBuffer({0}, {0, 1, 2, 3, 4}, ::arrow::io::IOContext(),
::arrow::io::CacheOptions::Defaults());
ASSERT_OK(reader->parquet_reader()->WhenBuffered({0}, {0, 1, 2, 3, 4}).status());

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {0, 1, 2, 3, 4}, &rb_reader));
Expand Down Expand Up @@ -2331,6 +2330,66 @@ TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ASSERT_EQ(actual_batch->num_rows(), num_rows);
}

TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
ArrowReaderProperties properties = default_arrow_reader_properties();
const int num_rows = 1024;
const int row_group_size = 512;
const int num_columns = 2;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));

std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

std::shared_ptr<FileReader> reader;
{
std::unique_ptr<FileReader> unique_reader;
FileReaderBuilder builder;
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
reader = std::move(unique_reader);
}

auto check_batches = [](const std::shared_ptr<::arrow::RecordBatch>& batch,
int num_columns, int num_rows) {
ASSERT_NE(batch, nullptr);
ASSERT_EQ(batch->num_columns(), num_columns);
ASSERT_EQ(batch->num_rows(), num_rows);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to actually check the contents read from file. Could you do that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted the test to check equality below.

};
{
ASSERT_OK_AND_ASSIGN(auto batch_generator,
reader->GetRecordBatchGenerator(reader, {0, 1}, {0, 1}));
auto fut1 = batch_generator();
auto fut2 = batch_generator();
auto fut3 = batch_generator();
ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
ASSERT_EQ(batch3, nullptr);
check_batches(batch1, num_columns, row_group_size);
check_batches(batch2, num_columns, row_group_size);
ASSERT_OK_AND_ASSIGN(auto actual, ::arrow::Table::FromRecordBatches(
batch1->schema(), {batch1, batch2}));
AssertTablesEqual(*table, *actual, /*same_chunk_layout=*/false);
}
{
// No columns case
ASSERT_OK_AND_ASSIGN(auto batch_generator,
reader->GetRecordBatchGenerator(reader, {0, 1}, {}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you always have to pass columns explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I figured this is a lower-level API. We could add overloads to ease this.

auto fut1 = batch_generator();
auto fut2 = batch_generator();
auto fut3 = batch_generator();
ASSERT_OK_AND_ASSIGN(auto batch1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto batch2, fut2.result());
ASSERT_OK_AND_ASSIGN(auto batch3, fut3.result());
ASSERT_EQ(batch3, nullptr);
check_batches(batch1, 0, row_group_size);
check_batches(batch2, 0, row_group_size);
}
}

TEST(TestArrowReadWrite, ScanContents) {
const int num_columns = 20;
const int num_rows = 1000;
Expand Down Expand Up @@ -2700,7 +2759,7 @@ TEST(ArrowReadWrite, Decimal256) {

auto type = ::arrow::decimal256(8, 4);

const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
"-9999.9999", "9999.9999"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
Expand Down
Loading