-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet dataset #38466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -834,5 +834,74 @@ TEST(TestParquetStatistics, NullMax) { | |
| EXPECT_EQ(stat_expression->ToString(), "(x >= 1)"); | ||
| } | ||
|
|
||
| class DelayedBufferReader : public ::arrow::io::BufferReader { | ||
| public: | ||
| explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) | ||
| : ::arrow::io::BufferReader(buffer) {} | ||
|
|
||
| ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync( | ||
| const ::arrow::io::IOContext& io_context, int64_t position, | ||
| int64_t nbytes) override { | ||
| read_async_count.fetch_add(1); | ||
| auto self = std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this()); | ||
| return DeferNotOk(::arrow::io::internal::SubmitIO( | ||
| io_context, [self, position, nbytes]() -> Result<std::shared_ptr<Buffer>> { | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
| return self->DoReadAt(position, nbytes); | ||
| })); | ||
| } | ||
|
|
||
| std::atomic<int> read_async_count{0}; | ||
| }; | ||
|
|
||
| TEST_F(TestParquetFileFormat, MultithreadedScanRegression) { | ||
| // GH-38438: This test is similar to MultithreadedScan, but it try to use self | ||
| // designed Executor and DelayedBufferReader to mock async execution to make | ||
| // the state machine more complex. | ||
| auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); | ||
|
|
||
| std::vector<Future<>> completes; | ||
| std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools; | ||
|
|
||
| for (int idx = 0; idx < 2; ++idx) { | ||
| auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer); | ||
| auto source = std::make_shared<FileSource>(buffer_reader, buffer->size()); | ||
| auto fragment = MakeFragment(*source); | ||
| std::shared_ptr<Scanner> scanner; | ||
|
|
||
| { | ||
| auto options = std::make_shared<ScanOptions>(); | ||
| ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); | ||
| pools.emplace_back(thread_pool); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pool is for case mentioned in the description |
||
| options->io_context = | ||
| ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); | ||
| auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>(); | ||
| fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); | ||
|
|
||
| options->fragment_scan_options = fragment_scan_options; | ||
| ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); | ||
|
|
||
| ASSERT_OK(builder.UseThreads(true)); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread is neccessary for testing |
||
| ASSERT_OK(builder.BatchSize(10000)); | ||
| ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); | ||
| } | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); | ||
| [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); | ||
| // Random ReadAsync calls, generate some futures to make the state machine | ||
| // more complex. | ||
| for (int yy = 0; yy < 16; yy++) { | ||
| completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); | ||
| } | ||
| scanner = nullptr; | ||
| } | ||
|
|
||
| for (auto& f : completes) { | ||
| f.Wait(); | ||
| } | ||
| } | ||
|
|
||
| } // namespace dataset | ||
| } // namespace arrow | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1123,10 +1123,10 @@ class RowGroupGenerator { | |
| auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); | ||
| if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); | ||
| row_group_read = | ||
| ready.Then([this, reader, row_group, | ||
| ready.Then([cpu_executor = cpu_executor_, reader, row_group, | ||
| column_indices = std::move( | ||
| column_indices)]() -> ::arrow::Future<RecordBatchGenerator> { | ||
| return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); | ||
| return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices); | ||
| }); | ||
| } | ||
| in_flight_reads_.push({std::move(row_group_read), num_rows}); | ||
|
|
@@ -1182,7 +1182,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader, | |
| int64_t rows_to_readahead) { | ||
| RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices)); | ||
| if (rows_to_readahead < 0) { | ||
| return Status::Invalid("rows_to_readahead must be > 0"); | ||
| return Status::Invalid("rows_to_readahead must be >= 0"); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be 0 here. |
||
| } | ||
| if (reader_properties_.pre_buffer()) { | ||
| BEGIN_PARQUET_CATCH_EXCEPTIONS | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using 2 to make the state machine a bit complex, also make we use another Executor