From 93fc284e5ba2071e26f306f5044e917fcc1d6fc1 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 14 Dec 2021 17:34:19 -0500 Subject: [PATCH 1/6] ARROW-15067: [C++] Add tracing spans to the scanner --- cpp/src/arrow/dataset/file_base.cc | 58 +++++++++++++++++++++++++++ cpp/src/arrow/dataset/file_csv.cc | 32 ++++++++++++--- cpp/src/arrow/dataset/file_ipc.cc | 40 ++++++++++++++---- cpp/src/arrow/dataset/file_parquet.cc | 25 ++++++++++-- cpp/src/arrow/dataset/scanner.cc | 18 +++++++++ cpp/src/arrow/util/tracing_internal.h | 35 +++++++++++++++- 6 files changed, 190 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index f4551c27590..ad522489e64 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -42,6 +42,7 @@ #include "arrow/util/map.h" #include "arrow/util/string.h" #include "arrow/util/task_group.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/variant.h" namespace arrow { @@ -113,6 +114,63 @@ Result> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } +// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient. +// Each of the formats should provide their own efficient implementation. However, this +// is a reasonable starting point or implementation for a dummy/mock format. +Result FileFormat::ScanBatchesAsync( + const std::shared_ptr& scan_options, + const std::shared_ptr& file) const { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); + struct State { + State(std::shared_ptr scan_options, ScanTaskIterator scan_task_it) + : scan_options(std::move(scan_options)), + scan_task_it(std::move(scan_task_it)), + current_rb_it(), + finished(false) {} + + std::shared_ptr scan_options; + ScanTaskIterator scan_task_it; + RecordBatchIterator current_rb_it; + bool finished; + }; + struct Generator { + Future> operator()() { + while (!state->finished) { + if (!state->current_rb_it) { + RETURN_NOT_OK(PumpScanTask()); + if (state->finished) { + return AsyncGeneratorEnd>(); + } + } + ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next()); + if (IsIterationEnd(next_batch)) { + state->current_rb_it = RecordBatchIterator(); + } else { + return Future>::MakeFinished(next_batch); + } + } + return AsyncGeneratorEnd>(); + } + Status PumpScanTask() { + ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next()); + if (IsIterationEnd(next_task)) { + state->finished = true; + } else { + ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute()); + } + return Status::OK(); + } + std::shared_ptr state; + }; + RecordBatchGenerator generator = + Generator{std::make_shared(scan_options, std::move(scan_task_it))}; +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next"); +#endif + return generator; +} + Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 1cc7957083f..ad47529529f 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -39,6 +39,7 @@ #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/utf8.h" namespace arrow { @@ -167,9 +168,14 @@ static inline Result GetReadOptions( static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options, Executor* cpu_executor) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + auto path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); @@ -190,11 +196,20 @@ static inline Future> OpenReaderAsync( })); return reader_fut.Then( // Adds the filename to the error - [](const std::shared_ptr& reader) - -> Result> { return reader; }, - [source](const Status& err) -> Result> { - return err.WithMessage("Could not open CSV input source '", source.path(), - "': ", err); + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& err) -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(err, span.get()); + span->End(); +#endif + return err.WithMessage("Could not open CSV input source '", path, "': ", err); }); } @@ -250,7 +265,12 @@ Result CsvFileFormat::ScanBatchesAsync( auto source = file->source(); auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); - return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); + auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); +#endif + return generator; } Future> CsvFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e386c7dea8d..767713d333b 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -62,16 +62,31 @@ static inline Result> OpenReader( static inline Future> OpenReaderAsync( const FileSource& source, const ipc::IpcReadOptions& options = default_read_options()) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); auto path = source.path(); return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options) - .Then([](const std::shared_ptr& reader) - -> Result> { return reader; }, - [path](const Status& status) - -> Result> { - return status.WithMessage("Could not open IPC input source '", path, - "': ", status.message()); - }); + .Then( + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& status) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(status, span.get()); + span->End(); +#endif + return status.WithMessage("Could not open IPC input source '", path, + "': ", status.message()); + }); } static inline Result> GetIncludedFields( @@ -121,6 +136,10 @@ Result> IpcFileFormat::Inspect(const FileSource& source) Result IpcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto parent_span = tracer->GetCurrentSpan(); +#endif auto self = shared_from_this(); auto source = file->source(); auto open_reader = OpenReaderAsync(source); @@ -151,6 +170,13 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::trace::StartSpanOptions span_options; + span_options.parent = parent_span->GetContext(); + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), std::move(span_options), + "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); +#endif auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index cdee1f00684..c22fd963685 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -34,6 +34,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/range.h" +#include "arrow/util/tracing_internal.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" @@ -366,6 +367,10 @@ Result> ParquetFileFormat::GetReader Future> ParquetFileFormat::GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE( auto parquet_scan_options, GetFragmentScanOptions(kParquetTypeName, options.get(), @@ -398,10 +403,17 @@ Future> ParquetFileFormat::GetReader RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif return std::move(arrow_reader); }, - [path]( - const Status& status) -> Result> { + [=](const Status& status) -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(status, span.get()); + span->End(); +#endif return WrapSourceError(status, path); }); } @@ -448,8 +460,13 @@ Result ParquetFileFormat::ScanBatchesAsync( ::arrow::internal::GetCpuThreadPool(), row_group_readahead)); return generator; }; - return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) - .Then(std::move(make_generator))); + auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) + .Then(std::move(make_generator))); +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); +#endif + return generator; } Future> ParquetFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index c99316f764a..eb9ccb2eeaa 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -35,10 +35,12 @@ #include "arrow/dataset/plan.h" #include "arrow/table.h" #include "arrow/util/async_generator.h" +#include "arrow/util/config.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -126,6 +128,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this FragmentToBatches( const Enumerated>& fragment, const std::shared_ptr& options) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan( + "arrow::dataset::FragmentToBatches", + { + {"arrow.dataset.fragment", fragment.value->ToString()}, + {"arrow.dataset.fragment.index", fragment.index}, + {"arrow.dataset.fragment.last", fragment.last}, + {"arrow.dataset.fragment.type_name", fragment.value->type_name()}, + }); + auto scope = tracer->WithActiveSpan(span); +#endif ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options)); ArrayVector columns; for (const auto& field : options->dataset_schema->fields()) { @@ -134,6 +148,10 @@ Result FragmentToBatches( MakeArrayOfNull(field->type(), /*length=*/0, options->pool)); columns.push_back(std::move(array)); } +#ifdef ARROW_WITH_OPENTELEMETRY + batch_gen = + arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span); +#endif batch_gen = MakeDefaultIfEmptyGenerator( std::move(batch_gen), RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index d4947ac88fe..156839471e0 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -88,9 +88,10 @@ Iterator WrapIterator( template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, + opentelemetry::trace::StartSpanOptions options, const std::string& span_name) { return [=]() mutable -> Future { - auto span = GetTracer()->StartSpan(span_name); + auto span = GetTracer()->StartSpan(span_name, {}, options); auto scope = GetTracer()->WithActiveSpan(span); auto fut = wrapped(); fut.AddCallback([span](const Result& result) { @@ -101,6 +102,38 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, }; } +/// \brief Start a new span for each invocation of a generator. +/// +/// The parent span of the new span will be the currently active span +/// (if any) as of when WrapAsyncGenerator was itself called. +template +AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, + const std::string& span_name) { + opentelemetry::trace::StartSpanOptions options; + options.parent = GetTracer()->GetCurrentSpan()->GetContext(); + return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name); +} + +/// \brief End the given span when the given async generator ends. +/// +/// The span will be made the active span each time the generator is called. +template +AsyncGenerator TieSpanToAsyncGenerator( + AsyncGenerator wrapped, + opentelemetry::nostd::shared_ptr span) { + return [=]() mutable -> Future { + auto scope = GetTracer()->WithActiveSpan(span); + auto fut = wrapped(); + fut.AddCallback([span](const Result& result) { + if (!result.ok() || IsIterationEnd(*result)) { + MarkSpan(result.status(), span.get()); + span->End(); + } + }); + return fut; + }; +} + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; From 9f56044e17a674f58057664e694cb23a0715052f Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 13:37:49 -0500 Subject: [PATCH 2/6] ARROW-15067: [C++] Add ASAN suppressions for OTel --- cpp/build-support/lsan-suppressions.txt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cpp/build-support/lsan-suppressions.txt b/cpp/build-support/lsan-suppressions.txt index 566857a9c09..a8918e10d94 100644 --- a/cpp/build-support/lsan-suppressions.txt +++ b/cpp/build-support/lsan-suppressions.txt @@ -19,3 +19,10 @@ leak:*__new_exitfn* # Leak at shutdown in OpenSSL leak:CRYPTO_zalloc + +# OpenTelemetry. These seem like false positives and go away if the +# CPU thread pool is manually shut down before exit. +# Note that ASan has trouble backtracing these and may not be able to +# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100 +leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack +leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize From 2771d32e702700855631f4430247b38dfd130ab1 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 13:49:35 -0500 Subject: [PATCH 3/6] ARROW-15067: [C++] Add Valgrind suppressions for OTel --- cpp/valgrind.supp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cpp/valgrind.supp b/cpp/valgrind.supp index 8d2d5da904b..2b9959136a8 100644 --- a/cpp/valgrind.supp +++ b/cpp/valgrind.supp @@ -51,3 +51,21 @@ ... fun:*re2*Prog* } +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*GetStack* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:_dl_allocate_tls +} From 8a1e67d853c25adff28f6b0ccaed96d282c45067 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 14:42:56 -0500 Subject: [PATCH 4/6] ARROW-15067: [C++] Ensure spans propagate through call to scanner --- cpp/src/arrow/dataset/file_ipc.cc | 1 + cpp/src/arrow/dataset/scanner.cc | 14 ++++++++++---- cpp/src/arrow/util/tracing_internal.h | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 767713d333b..79f2eafae05 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -30,6 +30,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" namespace arrow { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index eb9ccb2eeaa..6cf83f0121e 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -168,10 +168,16 @@ Result FragmentToBatches( Result> FragmentsToBatches( FragmentGenerator fragment_gen, const std::shared_ptr& options) { auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen)); - return MakeMappedGenerator(std::move(enumerated_fragment_gen), - [=](const Enumerated>& fragment) { - return FragmentToBatches(fragment, options); - }); + auto batch_gen_gen = + MakeMappedGenerator(std::move(enumerated_fragment_gen), + [=](const Enumerated>& fragment) { + return FragmentToBatches(fragment, options); + }); +#ifdef ARROW_WITH_OPENTELEMETRY + batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( + std::move(batch_gen_gen)); +#endif + return batch_gen_gen; } const FieldVector kAugmentedFields{ diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 156839471e0..f10a2450300 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -134,6 +134,25 @@ AsyncGenerator TieSpanToAsyncGenerator( }; } +/// \brief Activate the given span on each invocation of an async generator. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator( + AsyncGenerator wrapped, + opentelemetry::nostd::shared_ptr span) { + return [=]() mutable -> Future { + auto scope = GetTracer()->WithActiveSpan(span); + return wrapped(); + }; +} + +/// \brief Activate the given span on each invocation of an async generator. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) { + auto span = GetTracer()->GetCurrentSpan(); + if (!span->GetContext().IsValid()) return wrapped; + return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); +} + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; From 1a48e764158e1c1884ad638d93d5b9e6bffb5377 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 20 Dec 2021 09:30:34 -0500 Subject: [PATCH 5/6] ARROW-15044: [C++] Use Then() --- cpp/src/arrow/util/tracing_internal.h | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index f10a2450300..0fbebb68510 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -123,14 +123,15 @@ AsyncGenerator TieSpanToAsyncGenerator( opentelemetry::nostd::shared_ptr span) { return [=]() mutable -> Future { auto scope = GetTracer()->WithActiveSpan(span); - auto fut = wrapped(); - fut.AddCallback([span](const Result& result) { - if (!result.ok() || IsIterationEnd(*result)) { - MarkSpan(result.status(), span.get()); - span->End(); - } - }); - return fut; + return wrapped().Then( + [span](const T& result) -> Result { + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + return result; + }, + [span](const Status& status) -> Result { + MarkSpan(status, span.get()); + return status; + }); }; } From 89e01e8c8fe5175845337a12eae60fe083c0776f Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Thu, 3 Feb 2022 22:21:28 +0100 Subject: [PATCH 6/6] Fix rebase. Scanner::Scan was removed in ARROW-13554. --- cpp/src/arrow/dataset/file_base.cc | 57 ------------------------------ 1 file changed, 57 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index ad522489e64..20fc4a4aade 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -114,63 +114,6 @@ Result> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } -// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient. -// Each of the formats should provide their own efficient implementation. However, this -// is a reasonable starting point or implementation for a dummy/mock format. -Result FileFormat::ScanBatchesAsync( - const std::shared_ptr& scan_options, - const std::shared_ptr& file) const { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); - struct State { - State(std::shared_ptr scan_options, ScanTaskIterator scan_task_it) - : scan_options(std::move(scan_options)), - scan_task_it(std::move(scan_task_it)), - current_rb_it(), - finished(false) {} - - std::shared_ptr scan_options; - ScanTaskIterator scan_task_it; - RecordBatchIterator current_rb_it; - bool finished; - }; - struct Generator { - Future> operator()() { - while (!state->finished) { - if (!state->current_rb_it) { - RETURN_NOT_OK(PumpScanTask()); - if (state->finished) { - return AsyncGeneratorEnd>(); - } - } - ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next()); - if (IsIterationEnd(next_batch)) { - state->current_rb_it = RecordBatchIterator(); - } else { - return Future>::MakeFinished(next_batch); - } - } - return AsyncGeneratorEnd>(); - } - Status PumpScanTask() { - ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next()); - if (IsIterationEnd(next_task)) { - state->finished = true; - } else { - ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute()); - } - return Status::OK(); - } - std::shared_ptr state; - }; - RecordBatchGenerator generator = - Generator{std::make_shared(scan_options, std::move(scan_task_it))}; -#ifdef ARROW_WITH_OPENTELEMETRY - generator = arrow::internal::tracing::WrapAsyncGenerator( - std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next"); -#endif - return generator; -} - Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); }