diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index bf703b6c6ba..30fc0bc6aca 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -1113,16 +1113,17 @@ class AsyncThreadedTableReader Future> ProcessFirstBuffer() { // First block auto first_buffer_future = buffer_generator_(); - return first_buffer_future.Then([this](const std::shared_ptr& first_buffer) - -> Result> { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - std::shared_ptr first_buffer_processed; - RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed)); - RETURN_NOT_OK(MakeColumnBuilders()); - return first_buffer_processed; - }); + return first_buffer_future.Then( + [self = shared_from_this()](const std::shared_ptr& first_buffer) + -> Result> { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + std::shared_ptr first_buffer_processed; + RETURN_NOT_OK(self->ProcessHeader(first_buffer, &first_buffer_processed)); + RETURN_NOT_OK(self->MakeColumnBuilders()); + return first_buffer_processed; + }); } Executor* cpu_executor_; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index c22cf33eb35..84d4342a25e 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -834,5 +834,74 @@ TEST(TestParquetStatistics, NullMax) { EXPECT_EQ(stat_expression->ToString(), "(x >= 1)"); } +class DelayedBufferReader : public ::arrow::io::BufferReader { + public: + explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer) + : ::arrow::io::BufferReader(buffer) {} + + ::arrow::Future> ReadAsync( + const ::arrow::io::IOContext& io_context, int64_t position, + int64_t nbytes) override { + read_async_count.fetch_add(1); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return DeferNotOk(::arrow::io::internal::SubmitIO( + io_context, [self, position, nbytes]() -> Result> { + std::this_thread::sleep_for(std::chrono::seconds(1)); + return self->DoReadAt(position, nbytes); + })); + } + + std::atomic read_async_count{0}; +}; + +TEST_F(TestParquetFileFormat, MultithreadedScanRegression) { + // GH-38438: This test is similar to MultithreadedScan, but it try to use self + // designed Executor and DelayedBufferReader to mock async execution to make + // the state machine more complex. + auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 10000, 100); + + ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get())); + + std::vector> completes; + std::vector> pools; + + for (int idx = 0; idx < 2; ++idx) { + auto buffer_reader = std::make_shared(buffer); + auto source = std::make_shared(buffer_reader, buffer->size()); + auto fragment = MakeFragment(*source); + std::shared_ptr scanner; + + { + auto options = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto thread_pool, arrow::internal::ThreadPool::Make(1)); + pools.emplace_back(thread_pool); + options->io_context = + ::arrow::io::IOContext(::arrow::default_memory_pool(), pools.back().get()); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); + + options->fragment_scan_options = fragment_scan_options; + ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, options); + + ASSERT_OK(builder.UseThreads(true)); + ASSERT_OK(builder.BatchSize(10000)); + ASSERT_OK_AND_ASSIGN(scanner, builder.Finish()); + } + + ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000)); + [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync(); + // Random ReadAsync calls, generate some futures to make the state machine + // more complex. + for (int yy = 0; yy < 16; yy++) { + completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001)); + } + scanner = nullptr; + } + + for (auto& f : completes) { + f.Wait(); + } +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 99b8a9ccef1..d6ad7c25bc7 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1123,10 +1123,10 @@ class RowGroupGenerator { auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); row_group_read = - ready.Then([this, reader, row_group, + ready.Then([cpu_executor = cpu_executor_, reader, row_group, column_indices = std::move( column_indices)]() -> ::arrow::Future { - return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); + return ReadOneRowGroup(cpu_executor, reader, row_group, column_indices); }); } in_flight_reads_.push({std::move(row_group_read), num_rows}); @@ -1182,7 +1182,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, int64_t rows_to_readahead) { RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices)); if (rows_to_readahead < 0) { - return Status::Invalid("rows_to_readahead must be > 0"); + return Status::Invalid("rows_to_readahead must be >= 0"); } if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS