From c78f0cf59c43216e83c2b8ad01e6ba6462a38418 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 14 Dec 2021 17:34:19 -0500 Subject: [PATCH 01/42] ARROW-15067: [C++] Add tracing spans to the scanner --- cpp/src/arrow/dataset/file_base.cc | 58 +++++++++++++++++++++++++++ cpp/src/arrow/dataset/file_csv.cc | 32 ++++++++++++--- cpp/src/arrow/dataset/file_ipc.cc | 40 ++++++++++++++---- cpp/src/arrow/dataset/file_parquet.cc | 25 ++++++++++-- cpp/src/arrow/dataset/scanner.cc | 18 +++++++++ cpp/src/arrow/util/tracing_internal.h | 35 +++++++++++++++- 6 files changed, 190 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index f4551c27590..ad522489e64 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -42,6 +42,7 @@ #include "arrow/util/map.h" #include "arrow/util/string.h" #include "arrow/util/task_group.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/variant.h" namespace arrow { @@ -113,6 +114,63 @@ Result> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } +// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient. +// Each of the formats should provide their own efficient implementation. However, this +// is a reasonable starting point or implementation for a dummy/mock format. +Result FileFormat::ScanBatchesAsync( + const std::shared_ptr& scan_options, + const std::shared_ptr& file) const { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); + struct State { + State(std::shared_ptr scan_options, ScanTaskIterator scan_task_it) + : scan_options(std::move(scan_options)), + scan_task_it(std::move(scan_task_it)), + current_rb_it(), + finished(false) {} + + std::shared_ptr scan_options; + ScanTaskIterator scan_task_it; + RecordBatchIterator current_rb_it; + bool finished; + }; + struct Generator { + Future> operator()() { + while (!state->finished) { + if (!state->current_rb_it) { + RETURN_NOT_OK(PumpScanTask()); + if (state->finished) { + return AsyncGeneratorEnd>(); + } + } + ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next()); + if (IsIterationEnd(next_batch)) { + state->current_rb_it = RecordBatchIterator(); + } else { + return Future>::MakeFinished(next_batch); + } + } + return AsyncGeneratorEnd>(); + } + Status PumpScanTask() { + ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next()); + if (IsIterationEnd(next_task)) { + state->finished = true; + } else { + ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute()); + } + return Status::OK(); + } + std::shared_ptr state; + }; + RecordBatchGenerator generator = + Generator{std::make_shared(scan_options, std::move(scan_task_it))}; +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next"); +#endif + return generator; +} + Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 1cc7957083f..ad47529529f 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -39,6 +39,7 @@ #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/utf8.h" namespace arrow { @@ -167,9 +168,14 @@ static inline Result GetReadOptions( static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options, Executor* cpu_executor) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + auto path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); @@ -190,11 +196,20 @@ static inline Future> OpenReaderAsync( })); return reader_fut.Then( // Adds the filename to the error - [](const std::shared_ptr& reader) - -> Result> { return reader; }, - [source](const Status& err) -> Result> { - return err.WithMessage("Could not open CSV input source '", source.path(), - "': ", err); + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& err) -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(err, span.get()); + span->End(); +#endif + return err.WithMessage("Could not open CSV input source '", path, "': ", err); }); } @@ -250,7 +265,12 @@ Result CsvFileFormat::ScanBatchesAsync( auto source = file->source(); auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); - return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); + auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); +#endif + return generator; } Future> CsvFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e386c7dea8d..767713d333b 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -62,16 +62,31 @@ static inline Result> OpenReader( static inline Future> OpenReaderAsync( const FileSource& source, const ipc::IpcReadOptions& options = default_read_options()) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); auto path = source.path(); return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options) - .Then([](const std::shared_ptr& reader) - -> Result> { return reader; }, - [path](const Status& status) - -> Result> { - return status.WithMessage("Could not open IPC input source '", path, - "': ", status.message()); - }); + .Then( + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& status) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(status, span.get()); + span->End(); +#endif + return status.WithMessage("Could not open IPC input source '", path, + "': ", status.message()); + }); } static inline Result> GetIncludedFields( @@ -121,6 +136,10 @@ Result> IpcFileFormat::Inspect(const FileSource& source) Result IpcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto parent_span = tracer->GetCurrentSpan(); +#endif auto self = shared_from_this(); auto source = file->source(); auto open_reader = OpenReaderAsync(source); @@ -151,6 +170,13 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::trace::StartSpanOptions span_options; + span_options.parent = parent_span->GetContext(); + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), std::move(span_options), + "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); +#endif auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 5fbd457eccd..dabb0bc2d34 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -34,6 +34,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/range.h" +#include "arrow/util/tracing_internal.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" @@ -333,6 +334,10 @@ Result> ParquetFileFormat::GetReader Future> ParquetFileFormat::GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE( auto parquet_scan_options, GetFragmentScanOptions(kParquetTypeName, options.get(), @@ -365,10 +370,17 @@ Future> ParquetFileFormat::GetReader RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif return std::move(arrow_reader); }, - [path]( - const Status& status) -> Result> { + [=](const Status& status) -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(status, span.get()); + span->End(); +#endif return WrapSourceError(status, path); }); } @@ -415,8 +427,13 @@ Result ParquetFileFormat::ScanBatchesAsync( ::arrow::internal::GetCpuThreadPool(), row_group_readahead)); return generator; }; - return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) - .Then(std::move(make_generator))); + auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) + .Then(std::move(make_generator))); +#ifdef ARROW_WITH_OPENTELEMETRY + generator = arrow::internal::tracing::WrapAsyncGenerator( + std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); +#endif + return generator; } Future> ParquetFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b958f7b9e62..93c0bc9f077 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -35,10 +35,12 @@ #include "arrow/dataset/plan.h" #include "arrow/table.h" #include "arrow/util/async_generator.h" +#include "arrow/util/config.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -206,6 +208,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this FragmentToBatches( const Enumerated>& fragment, const std::shared_ptr& options) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan( + "arrow::dataset::FragmentToBatches", + { + {"arrow.dataset.fragment", fragment.value->ToString()}, + {"arrow.dataset.fragment.index", fragment.index}, + {"arrow.dataset.fragment.last", fragment.last}, + {"arrow.dataset.fragment.type_name", fragment.value->type_name()}, + }); + auto scope = tracer->WithActiveSpan(span); +#endif ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options)); ArrayVector columns; for (const auto& field : options->dataset_schema->fields()) { @@ -214,6 +228,10 @@ Result FragmentToBatches( MakeArrayOfNull(field->type(), /*length=*/0, options->pool)); columns.push_back(std::move(array)); } +#ifdef ARROW_WITH_OPENTELEMETRY + batch_gen = + arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span); +#endif batch_gen = MakeDefaultIfEmptyGenerator( std::move(batch_gen), RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index d4947ac88fe..156839471e0 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -88,9 +88,10 @@ Iterator WrapIterator( template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, + opentelemetry::trace::StartSpanOptions options, const std::string& span_name) { return [=]() mutable -> Future { - auto span = GetTracer()->StartSpan(span_name); + auto span = GetTracer()->StartSpan(span_name, {}, options); auto scope = GetTracer()->WithActiveSpan(span); auto fut = wrapped(); fut.AddCallback([span](const Result& result) { @@ -101,6 +102,38 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, }; } +/// \brief Start a new span for each invocation of a generator. +/// +/// The parent span of the new span will be the currently active span +/// (if any) as of when WrapAsyncGenerator was itself called. +template +AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, + const std::string& span_name) { + opentelemetry::trace::StartSpanOptions options; + options.parent = GetTracer()->GetCurrentSpan()->GetContext(); + return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name); +} + +/// \brief End the given span when the given async generator ends. +/// +/// The span will be made the active span each time the generator is called. +template +AsyncGenerator TieSpanToAsyncGenerator( + AsyncGenerator wrapped, + opentelemetry::nostd::shared_ptr span) { + return [=]() mutable -> Future { + auto scope = GetTracer()->WithActiveSpan(span); + auto fut = wrapped(); + fut.AddCallback([span](const Result& result) { + if (!result.ok() || IsIterationEnd(*result)) { + MarkSpan(result.status(), span.get()); + span->End(); + } + }); + return fut; + }; +} + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; From 18dd5572d7eb1ee32237baa173b166a664606808 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 13:37:49 -0500 Subject: [PATCH 02/42] ARROW-15067: [C++] Add ASAN suppressions for OTel --- cpp/build-support/lsan-suppressions.txt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cpp/build-support/lsan-suppressions.txt b/cpp/build-support/lsan-suppressions.txt index 566857a9c09..a8918e10d94 100644 --- a/cpp/build-support/lsan-suppressions.txt +++ b/cpp/build-support/lsan-suppressions.txt @@ -19,3 +19,10 @@ leak:*__new_exitfn* # Leak at shutdown in OpenSSL leak:CRYPTO_zalloc + +# OpenTelemetry. These seem like false positives and go away if the +# CPU thread pool is manually shut down before exit. +# Note that ASan has trouble backtracing these and may not be able to +# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100 +leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack +leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize From 12abf3bcb46cde0626ab8996c70efc7792074d93 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 13:49:35 -0500 Subject: [PATCH 03/42] ARROW-15067: [C++] Add Valgrind suppressions for OTel --- cpp/valgrind.supp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cpp/valgrind.supp b/cpp/valgrind.supp index 8d2d5da904b..2b9959136a8 100644 --- a/cpp/valgrind.supp +++ b/cpp/valgrind.supp @@ -51,3 +51,21 @@ ... fun:*re2*Prog* } +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*GetStack* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:_dl_allocate_tls +} From d9625639f7cc639685518a790152ede3121708b4 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 15 Dec 2021 14:42:56 -0500 Subject: [PATCH 04/42] ARROW-15067: [C++] Ensure spans propagate through call to scanner --- cpp/src/arrow/dataset/file_ipc.cc | 1 + cpp/src/arrow/dataset/scanner.cc | 14 ++++++++++---- cpp/src/arrow/util/tracing_internal.h | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 767713d333b..79f2eafae05 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -30,6 +30,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" namespace arrow { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 93c0bc9f077..a8575e7bf1a 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -248,10 +248,16 @@ Result FragmentToBatches( Result> FragmentsToBatches( FragmentGenerator fragment_gen, const std::shared_ptr& options) { auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen)); - return MakeMappedGenerator(std::move(enumerated_fragment_gen), - [=](const Enumerated>& fragment) { - return FragmentToBatches(fragment, options); - }); + auto batch_gen_gen = + MakeMappedGenerator(std::move(enumerated_fragment_gen), + [=](const Enumerated>& fragment) { + return FragmentToBatches(fragment, options); + }); +#ifdef ARROW_WITH_OPENTELEMETRY + batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( + std::move(batch_gen_gen)); +#endif + return batch_gen_gen; } class OneShotFragment : public Fragment { diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 156839471e0..f10a2450300 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -134,6 +134,25 @@ AsyncGenerator TieSpanToAsyncGenerator( }; } +/// \brief Activate the given span on each invocation of an async generator. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator( + AsyncGenerator wrapped, + opentelemetry::nostd::shared_ptr span) { + return [=]() mutable -> Future { + auto scope = GetTracer()->WithActiveSpan(span); + return wrapped(); + }; +} + +/// \brief Activate the given span on each invocation of an async generator. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) { + auto span = GetTracer()->GetCurrentSpan(); + if (!span->GetContext().IsValid()) return wrapped; + return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); +} + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; From 7385c96496d1ee395f8b7e12fd8d997f2d543924 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 20 Dec 2021 09:30:34 -0500 Subject: [PATCH 05/42] ARROW-15044: [C++] Use Then() --- cpp/src/arrow/util/tracing_internal.h | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index f10a2450300..0fbebb68510 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -123,14 +123,15 @@ AsyncGenerator TieSpanToAsyncGenerator( opentelemetry::nostd::shared_ptr span) { return [=]() mutable -> Future { auto scope = GetTracer()->WithActiveSpan(span); - auto fut = wrapped(); - fut.AddCallback([span](const Result& result) { - if (!result.ok() || IsIterationEnd(*result)) { - MarkSpan(result.status(), span.get()); - span->End(); - } - }); - return fut; + return wrapped().Then( + [span](const T& result) -> Result { + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + return result; + }, + [span](const Status& status) -> Result { + MarkSpan(status, span.get()); + return status; + }); }; } From 26d41e6ae7c5bad4ef431f0f7f371932d0876578 Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Thu, 3 Feb 2022 22:21:28 +0100 Subject: [PATCH 06/42] Fix rebase. Scanner::Scan was removed in ARROW-13554. --- cpp/src/arrow/dataset/file_base.cc | 57 ------------------------------ 1 file changed, 57 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index ad522489e64..20fc4a4aade 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -114,63 +114,6 @@ Result> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } -// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient. -// Each of the formats should provide their own efficient implementation. However, this -// is a reasonable starting point or implementation for a dummy/mock format. -Result FileFormat::ScanBatchesAsync( - const std::shared_ptr& scan_options, - const std::shared_ptr& file) const { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); - struct State { - State(std::shared_ptr scan_options, ScanTaskIterator scan_task_it) - : scan_options(std::move(scan_options)), - scan_task_it(std::move(scan_task_it)), - current_rb_it(), - finished(false) {} - - std::shared_ptr scan_options; - ScanTaskIterator scan_task_it; - RecordBatchIterator current_rb_it; - bool finished; - }; - struct Generator { - Future> operator()() { - while (!state->finished) { - if (!state->current_rb_it) { - RETURN_NOT_OK(PumpScanTask()); - if (state->finished) { - return AsyncGeneratorEnd>(); - } - } - ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next()); - if (IsIterationEnd(next_batch)) { - state->current_rb_it = RecordBatchIterator(); - } else { - return Future>::MakeFinished(next_batch); - } - } - return AsyncGeneratorEnd>(); - } - Status PumpScanTask() { - ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next()); - if (IsIterationEnd(next_task)) { - state->finished = true; - } else { - ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute()); - } - return Status::OK(); - } - std::shared_ptr state; - }; - RecordBatchGenerator generator = - Generator{std::make_shared(scan_options, std::move(scan_task_it))}; -#ifdef ARROW_WITH_OPENTELEMETRY - generator = arrow::internal::tracing::WrapAsyncGenerator( - std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next"); -#endif - return generator; -} - Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } From c5805e07982095cf680b7e581a5f6c2415dd07d7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 13:47:24 +0100 Subject: [PATCH 07/42] Propagating spans to asynchronous parquet reader --- cpp/src/arrow/dataset/file_parquet.cc | 6 ++++++ cpp/src/parquet/arrow/reader.cc | 26 +++++++++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index dabb0bc2d34..cf461e09b81 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -400,10 +400,16 @@ Result ParquetFileFormat::ScanBatchesAsync( pre_filtered = true; if (row_groups.empty()) return MakeEmptyGenerator>(); } +#ifdef ARROW_WITH_OPENTELEMETRY + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); +#endif // Open the reader and pay the real IO cost. auto make_generator = [=](const std::shared_ptr& reader) mutable -> Result { +#ifdef ARROW_WITH_OPENTELEMETRY + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); +#endif // Ensure that parquet_fragment has FileMetaData RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get())); if (!pre_filtered) { diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 3b0b622e925..dd767a984c3 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -46,6 +46,8 @@ #include "parquet/properties.h" #include "parquet/schema.h" +#include "arrow/util/tracing_internal.h" + using arrow::Array; using arrow::ArrayData; using arrow::BooleanArray; @@ -67,6 +69,7 @@ using arrow::TimestampArray; using arrow::internal::checked_cast; using arrow::internal::Iota; + // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; @@ -1060,7 +1063,14 @@ class RowGroupGenerator { } auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); - return ready.Then([=]() -> ::arrow::Future { + +#ifdef ARROW_WITH_OPENTELEMETRY + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); +#endif + return ready.Then([=]() mutable -> ::arrow::Future { +#ifdef ARROW_WITH_OPENTELEMETRY + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); +#endif return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); }); } @@ -1125,6 +1135,11 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator), row_group_readahead); } +#ifdef ARROW_WITH_OPENTELEMETRY + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + row_group_generator = + ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(row_group_generator), span); +#endif return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); } @@ -1173,11 +1188,16 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); - auto read_column = [row_groups, self, this](size_t i, - std::shared_ptr reader) + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + auto read_column = [row_groups, self, span, this](size_t i, + std::shared_ptr reader) mutable // need to add mutable to prevent thread_span constness -> ::arrow::Result> { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); + auto newspan = ::arrow::internal::tracing::GetTracer()->StartSpan( + std::string("arrow::parquet::DecodeRowGroups - read_column")); std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); + newspan->End(); return column; }; auto make_table = [result_schema, row_groups, self, From f74b77e46f56c348ca531ae055d41189b1ea4705 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 15:59:26 +0100 Subject: [PATCH 08/42] Removed a span layer from scanner, added some config ifdefs --- cpp/src/arrow/dataset/file_parquet.cc | 12 ------------ cpp/src/parquet/arrow/reader.cc | 15 ++++++++++----- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index cf461e09b81..bbce60526fb 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -334,10 +334,6 @@ Result> ParquetFileFormat::GetReader Future> ParquetFileFormat::GetReaderAsync( const FileSource& source, const std::shared_ptr& options) const { -#ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = arrow::internal::tracing::GetTracer(); - auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync"); -#endif ARROW_ASSIGN_OR_RAISE( auto parquet_scan_options, GetFragmentScanOptions(kParquetTypeName, options.get(), @@ -370,17 +366,9 @@ Future> ParquetFileFormat::GetReader RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader)); -#ifdef ARROW_WITH_OPENTELEMETRY - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - span->End(); -#endif return std::move(arrow_reader); }, [=](const Status& status) -> Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - arrow::internal::tracing::MarkSpan(status, span.get()); - span->End(); -#endif return WrapSourceError(status, path); }); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index dd767a984c3..eaaf7ebccd4 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1188,16 +1188,21 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); +#ifdef ARROW_WITH_OPENTELEMETRY auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); +#endif auto read_column = [row_groups, self, span, this](size_t i, - std::shared_ptr reader) mutable // need to add mutable to prevent thread_span constness + std::shared_ptr reader) mutable -> ::arrow::Result> { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); - auto newspan = ::arrow::internal::tracing::GetTracer()->StartSpan( - std::string("arrow::parquet::DecodeRowGroups - read_column")); +#ifdef ARROW_WITH_OPENTELEMETRY + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); + ::arrow::util::tracing::Span childspan; + ::arrow::util::tracing::Span parentspan; + parentspan.Set(::arrow::util::tracing::Span::Impl{span}); + START_SPAN_WITH_PARENT(childspan, parentspan, "arrow::parquet::DecodeRowGroups - read_column"); +#endif std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); - newspan->End(); return column; }; auto make_table = [result_schema, row_groups, self, From e4fa2c9bc82914ee2603aaaf16712bd8864d9240 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 16:41:14 +0100 Subject: [PATCH 09/42] Code formatting --- cpp/src/parquet/arrow/reader.cc | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index eaaf7ebccd4..42da917a55e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1137,8 +1137,8 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, } #ifdef ARROW_WITH_OPENTELEMETRY auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - row_group_generator = - ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(row_group_generator), span); + row_group_generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator( + std::move(row_group_generator), span); #endif return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); } @@ -1191,15 +1191,16 @@ Future> FileReaderImpl::DecodeRowGroups( #ifdef ARROW_WITH_OPENTELEMETRY auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); #endif - auto read_column = [row_groups, self, span, this](size_t i, - std::shared_ptr reader) mutable + auto read_column = [row_groups, self, span, this]( + size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { #ifdef ARROW_WITH_OPENTELEMETRY - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); - ::arrow::util::tracing::Span childspan; - ::arrow::util::tracing::Span parentspan; - parentspan.Set(::arrow::util::tracing::Span::Impl{span}); - START_SPAN_WITH_PARENT(childspan, parentspan, "arrow::parquet::DecodeRowGroups - read_column"); + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); + ::arrow::util::tracing::Span childspan; + ::arrow::util::tracing::Span parentspan; + parentspan.Set(::arrow::util::tracing::Span::Impl{span}); + START_SPAN_WITH_PARENT(childspan, parentspan, + "arrow::parquet::DecodeRowGroups - read_column"); #endif std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); From 6f97ba581d515c6278b769baccb6b9c7ac5a7bca Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 21:02:05 +0100 Subject: [PATCH 10/42] Added some attributes to parquet column reader span --- cpp/src/parquet/arrow/reader.cc | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 42da917a55e..e117540ce07 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -273,6 +273,18 @@ class FileReaderImpl : public FileReader { records_to_read += reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values(); } +#ifdef ARROW_WITH_OPENTELEMETRY + std::string column_name = reader_->metadata()->schema()->Column(i)->name(); + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + ::arrow::util::tracing::Span childspan; + ::arrow::util::tracing::Span parentspan; + parentspan.Set(::arrow::util::tracing::Span::Impl{span}); + START_SPAN_WITH_PARENT(childspan, parentspan, + "parquet::arrow::read_column", + {{"parquet.arrow.columnindex", i}, + {"parquet.arrow.columnname", column_name}, + {"parquet.arrow.records_to_read", records_to_read}}); +#endif return reader->NextBatch(records_to_read, out); END_PARQUET_CATCH_EXCEPTIONS } @@ -1196,11 +1208,6 @@ Future> FileReaderImpl::DecodeRowGroups( -> ::arrow::Result> { #ifdef ARROW_WITH_OPENTELEMETRY auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); - ::arrow::util::tracing::Span childspan; - ::arrow::util::tracing::Span parentspan; - parentspan.Set(::arrow::util::tracing::Span::Impl{span}); - START_SPAN_WITH_PARENT(childspan, parentspan, - "arrow::parquet::DecodeRowGroups - read_column"); #endif std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); From 3e00a5f12ec68426a319c4e78780d4c646391aff Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 21:14:46 +0100 Subject: [PATCH 11/42] TieSpanToAsyncGenerator now ending span when asyncgen finishes --- cpp/src/arrow/util/tracing_internal.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 0fbebb68510..277aa8bf923 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -126,10 +126,12 @@ AsyncGenerator TieSpanToAsyncGenerator( return wrapped().Then( [span](const T& result) -> Result { span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); return result; }, [span](const Status& status) -> Result { MarkSpan(status, span.get()); + span->End(); return status; }); }; From 8a16e2dbcb3cb4bb80ac61b8f614710370c83ac9 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 11 Mar 2022 21:54:50 +0100 Subject: [PATCH 12/42] Added some tracing macros to replace the prolific #ifdefs --- cpp/src/arrow/dataset/scanner.cc | 10 ++-------- cpp/src/arrow/util/tracing_internal.h | 26 ++++++++++++++++++++++++++ cpp/src/parquet/arrow/reader.cc | 24 +++++++----------------- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index a8575e7bf1a..62af326d3dc 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -228,10 +228,7 @@ Result FragmentToBatches( MakeArrayOfNull(field->type(), /*length=*/0, options->pool)); columns.push_back(std::move(array)); } -#ifdef ARROW_WITH_OPENTELEMETRY - batch_gen = - arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span); -#endif + TIE_SPAN_TO_GENERATOR(batch_gen); batch_gen = MakeDefaultIfEmptyGenerator( std::move(batch_gen), RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); @@ -253,10 +250,7 @@ Result> FragmentsToBatches( [=](const Enumerated>& fragment) { return FragmentToBatches(fragment, options); }); -#ifdef ARROW_WITH_OPENTELEMETRY - batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( - std::move(batch_gen_gen)); -#endif + PROPAGATE_SPAN_TO_GENERATOR(std::move(batch_gen_gen)); return batch_gen_gen; } diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 277aa8bf923..f812a4adf6d 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -137,6 +137,15 @@ AsyncGenerator TieSpanToAsyncGenerator( }; } +/// \brief End the given span when the given async generator ends. +/// +/// The span will be made the active span each time the generator is called. +template +AsyncGenerator TieSpanToAsyncGenerator(AsyncGenerator wrapped) { + auto span = GetTracer()->GetCurrentSpan(); + return TieSpanToAsyncGenerator(wrapped, span); +} + /// \brief Activate the given span on each invocation of an async generator. template AsyncGenerator PropagateSpanThroughAsyncGenerator( @@ -201,6 +210,19 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) +#define GET_CURRENT_SPAN(span) \ + auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() + +#define SET_SPAN_SCOPE(scope, span) \ + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) + +#define TIE_SPAN_TO_GENERATOR(generator) \ + generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) + +#define PROPAGATE_SPAN_TO_GENERATOR(generator) \ + generator = \ + ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) + #else class SpanImpl {}; @@ -211,6 +233,10 @@ class SpanImpl {}; #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) +#define GET_CURRENT_SPAN(span) +#define SET_SPAN_SCOPE(scope, span) +#define TIE_SPAN_TO_GENERATOR(generator) +#define PROPAGATE_SPAN_TO_GENERATOR(generator) #endif diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e117540ce07..e0ccd91d56e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -275,6 +275,7 @@ class FileReaderImpl : public FileReader { } #ifdef ARROW_WITH_OPENTELEMETRY std::string column_name = reader_->metadata()->schema()->Column(i)->name(); + std::string phys_type = TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); ::arrow::util::tracing::Span childspan; ::arrow::util::tracing::Span parentspan; @@ -283,6 +284,7 @@ class FileReaderImpl : public FileReader { "parquet::arrow::read_column", {{"parquet.arrow.columnindex", i}, {"parquet.arrow.columnname", column_name}, + {"parquet.arrow.physicaltype", phys_type}, {"parquet.arrow.records_to_read", records_to_read}}); #endif return reader->NextBatch(records_to_read, out); @@ -1076,13 +1078,9 @@ class RowGroupGenerator { auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); -#ifdef ARROW_WITH_OPENTELEMETRY - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); -#endif + GET_CURRENT_SPAN(span); return ready.Then([=]() mutable -> ::arrow::Future { -#ifdef ARROW_WITH_OPENTELEMETRY - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); -#endif + SET_SPAN_SCOPE(scope, span); return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); }); } @@ -1147,11 +1145,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator), row_group_readahead); } -#ifdef ARROW_WITH_OPENTELEMETRY - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - row_group_generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator( - std::move(row_group_generator), span); -#endif + TIE_SPAN_TO_GENERATOR(std::move(row_group_generator)); return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); } @@ -1200,15 +1194,11 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); -#ifdef ARROW_WITH_OPENTELEMETRY - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); -#endif + GET_CURRENT_SPAN(span); auto read_column = [row_groups, self, span, this]( size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); -#endif + SET_SPAN_SCOPE(scope, span); std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); return column; From 25e654d93a11b6c9049eb38d0e2e5fa507e55b97 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 18 Mar 2022 13:13:27 +0100 Subject: [PATCH 13/42] Processed review comments --- cpp/src/arrow/dataset/file_base.cc | 1 - cpp/src/parquet/arrow/reader.cc | 10 +++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 20fc4a4aade..f4551c27590 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -42,7 +42,6 @@ #include "arrow/util/map.h" #include "arrow/util/string.h" #include "arrow/util/task_group.h" -#include "arrow/util/tracing_internal.h" #include "arrow/util/variant.h" namespace arrow { diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e0ccd91d56e..a2c60934b34 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -38,6 +38,7 @@ #include "arrow/util/make_unique.h" #include "arrow/util/parallel.h" #include "arrow/util/range.h" +#include "arrow/util/tracing_internal.h" #include "parquet/arrow/reader_internal.h" #include "parquet/column_reader.h" #include "parquet/exception.h" @@ -46,8 +47,6 @@ #include "parquet/properties.h" #include "parquet/schema.h" -#include "arrow/util/tracing_internal.h" - using arrow::Array; using arrow::ArrayData; using arrow::BooleanArray; @@ -276,11 +275,8 @@ class FileReaderImpl : public FileReader { #ifdef ARROW_WITH_OPENTELEMETRY std::string column_name = reader_->metadata()->schema()->Column(i)->name(); std::string phys_type = TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - ::arrow::util::tracing::Span childspan; - ::arrow::util::tracing::Span parentspan; - parentspan.Set(::arrow::util::tracing::Span::Impl{span}); - START_SPAN_WITH_PARENT(childspan, parentspan, + ::arrow::util::tracing::Span span; + START_SPAN(span, "parquet::arrow::read_column", {{"parquet.arrow.columnindex", i}, {"parquet.arrow.columnname", column_name}, From 837d56ee37be57a6f82d2f4cd796c94bd08cc601 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 18 Mar 2022 13:25:35 +0100 Subject: [PATCH 14/42] Changed lambda capture list to = to prevent errors when building without opentelemetry --- cpp/src/parquet/arrow/reader.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index a2c60934b34..4c64afb994b 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1191,7 +1191,7 @@ Future> FileReaderImpl::DecodeRowGroups( if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); GET_CURRENT_SPAN(span); - auto read_column = [row_groups, self, span, this]( + auto read_column = [=]( size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { SET_SPAN_SCOPE(scope, span); @@ -1199,8 +1199,7 @@ Future> FileReaderImpl::DecodeRowGroups( RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); return column; }; - auto make_table = [result_schema, row_groups, self, - this](const ::arrow::ChunkedArrayVector& columns) + auto make_table = [=](const ::arrow::ChunkedArrayVector& columns) -> ::arrow::Result> { int64_t num_rows = 0; if (!columns.empty()) { From e742035d397bd4bd7e315b4687625bef79011b37 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 18 Mar 2022 15:20:03 +0100 Subject: [PATCH 15/42] Attempt at creating a struct wrapper for Future --- cpp/src/arrow/util/future.h | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index b374c77c81e..3539d4e13d2 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -32,6 +32,7 @@ #include "arrow/util/functional.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/type_fwd.h" #include "arrow/util/visibility.h" @@ -544,7 +545,19 @@ class ARROW_MUST_USE_TYPE Future { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here - impl_->AddCallback(Callback{std::move(on_complete)}, opts); + struct { + Result func() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + return wrapped(); + } + OnComplete wrapped; + opentelemetry::nostd::shared_ptr activeSpan; + std::string span_name; + } Wrapper; + Wrapper.wrapped = std::move(on_complete); + Wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + + impl_->AddCallback(Callback{std::move(Wrapper.func)}, opts); } /// \brief Overload of AddCallback that will return false instead of running From 17cb60993875d0b5dde5ae64dd7f60fdf7bc0345 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 18 Mar 2022 20:43:09 +0100 Subject: [PATCH 16/42] Forward declaring GetTracer in Future --- cpp/src/arrow/util/future.h | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 3539d4e13d2..64dbaa65794 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -32,10 +32,12 @@ #include "arrow/util/functional.h" #include "arrow/util/macros.h" #include "arrow/util/optional.h" -#include "arrow/util/tracing_internal.h" #include "arrow/util/type_fwd.h" #include "arrow/util/visibility.h" +#include +#include + namespace arrow { template @@ -354,6 +356,12 @@ class ARROW_EXPORT FutureWaiter { friend class ConcreteFutureImpl; }; +namespace internal { +namespace tracing { + opentelemetry::trace::Tracer *GetTracer(); +} +} + // --------------------------------------------------------------------- // Public API From dac4c0a3219699fc95f4e659cbe78c8527f3b938 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 18 Mar 2022 21:21:15 +0100 Subject: [PATCH 17/42] Modifications to wrapper struct, still not working --- cpp/src/arrow/util/future.h | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 64dbaa65794..761c4f31637 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -553,19 +553,19 @@ class ARROW_MUST_USE_TYPE Future { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here - struct { - Result func() { + struct Wrapstruct { + void operator()() { auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - return wrapped(); + func(); } - OnComplete wrapped; + OnComplete func; opentelemetry::nostd::shared_ptr activeSpan; - std::string span_name; - } Wrapper; - Wrapper.wrapped = std::move(on_complete); - Wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + }; + Wrapstruct wrapper; + wrapper.func = std::forward(on_complete); + wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - impl_->AddCallback(Callback{std::move(Wrapper.func)}, opts); + impl_->AddCallback(Callback{std::move(wrapper)}, opts); } /// \brief Overload of AddCallback that will return false instead of running From 05d4b4113032c059a288123faa3d289c3596d6a6 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 21 Mar 2022 13:32:46 +0100 Subject: [PATCH 18/42] Moved tracing span wrapper to ConcreteFutureImpl (still not working yet) --- cpp/src/arrow/util/future.cc | 18 +++++++++++++++++- cpp/src/arrow/util/future.h | 23 +---------------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index c398d992861..0ed019b6521 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -27,6 +27,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -241,8 +242,23 @@ class ConcreteFutureImpl : public FutureImpl { void AddCallback(Callback callback, CallbackOptions opts) { CheckOptions(opts); + struct Wrapstruct { + void operator()() { + bool trace_valid = activeSpan->GetContext().IsValid(); + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(*self); + } + Callback func; + std::shared_ptr self; + opentelemetry::nostd::shared_ptr activeSpan; + }; + Wrapstruct wrapper; + wrapper.func = std::forward(callback); + wrapper.self = shared_from_this(); + wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + std::unique_lock lock(mutex_); - CallbackRecord callback_record{std::move(callback), opts}; + CallbackRecord callback_record{std::move(std::forward(wrapper)), opts}; if (IsFutureFinished(state_)) { lock.unlock(); RunOrScheduleCallback(shared_from_this(), std::move(callback_record), diff --git a/cpp/src/arrow/util/future.h b/cpp/src/arrow/util/future.h index 761c4f31637..b374c77c81e 100644 --- a/cpp/src/arrow/util/future.h +++ b/cpp/src/arrow/util/future.h @@ -35,9 +35,6 @@ #include "arrow/util/type_fwd.h" #include "arrow/util/visibility.h" -#include -#include - namespace arrow { template @@ -356,12 +353,6 @@ class ARROW_EXPORT FutureWaiter { friend class ConcreteFutureImpl; }; -namespace internal { -namespace tracing { - opentelemetry::trace::Tracer *GetTracer(); -} -} - // --------------------------------------------------------------------- // Public API @@ -553,19 +544,7 @@ class ARROW_MUST_USE_TYPE Future { // We know impl_ will not be dangling when invoking callbacks because at least one // thread will be waiting for MarkFinished to return. Thus it's safe to keep a // weak reference to impl_ here - struct Wrapstruct { - void operator()() { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - func(); - } - OnComplete func; - opentelemetry::nostd::shared_ptr activeSpan; - }; - Wrapstruct wrapper; - wrapper.func = std::forward(on_complete); - wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - - impl_->AddCallback(Callback{std::move(wrapper)}, opts); + impl_->AddCallback(Callback{std::move(on_complete)}, opts); } /// \brief Overload of AddCallback that will return false instead of running From f22cd449fac7ffaff7c95bd62cf4e4a56d24edb8 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 21 Mar 2022 14:22:42 +0100 Subject: [PATCH 19/42] Passing along tracing span to futures now seems to be working --- cpp/src/arrow/dataset/file_parquet.cc | 6 ------ cpp/src/arrow/util/future.cc | 12 ++++-------- cpp/src/parquet/arrow/reader.cc | 2 -- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index bbce60526fb..327dfcdd1fd 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -388,16 +388,10 @@ Result ParquetFileFormat::ScanBatchesAsync( pre_filtered = true; if (row_groups.empty()) return MakeEmptyGenerator>(); } -#ifdef ARROW_WITH_OPENTELEMETRY - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); -#endif // Open the reader and pay the real IO cost. auto make_generator = [=](const std::shared_ptr& reader) mutable -> Result { -#ifdef ARROW_WITH_OPENTELEMETRY - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); -#endif // Ensure that parquet_fragment has FileMetaData RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get())); if (!pre_filtered) { diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 0ed019b6521..274800fd9eb 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -243,22 +243,18 @@ class ConcreteFutureImpl : public FutureImpl { void AddCallback(Callback callback, CallbackOptions opts) { CheckOptions(opts); struct Wrapstruct { - void operator()() { + void operator()(const FutureImpl& impl) { bool trace_valid = activeSpan->GetContext().IsValid(); auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - std::move(func)(*self); + std::move(func)(impl); } Callback func; - std::shared_ptr self; opentelemetry::nostd::shared_ptr activeSpan; }; - Wrapstruct wrapper; - wrapper.func = std::forward(callback); - wrapper.self = shared_from_this(); - wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); + Wrapstruct wrapper{std::forward(callback), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; std::unique_lock lock(mutex_); - CallbackRecord callback_record{std::move(std::forward(wrapper)), opts}; + CallbackRecord callback_record{std::move(wrapper), opts}; if (IsFutureFinished(state_)) { lock.unlock(); RunOrScheduleCallback(shared_from_this(), std::move(callback_record), diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 4c64afb994b..5351edff8a9 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1074,9 +1074,7 @@ class RowGroupGenerator { auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); - GET_CURRENT_SPAN(span); return ready.Then([=]() mutable -> ::arrow::Future { - SET_SPAN_SCOPE(scope, span); return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); }); } From 9482c58673c9c7581bf4ac7555f00f596844e216 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 21 Mar 2022 14:44:58 +0100 Subject: [PATCH 20/42] Guarding trace code with #ifdef ARROW_WITH_OPENTELEMETRY --- cpp/src/arrow/util/future.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 274800fd9eb..bde1f180cf8 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -242,6 +242,8 @@ class ConcreteFutureImpl : public FutureImpl { void AddCallback(Callback callback, CallbackOptions opts) { CheckOptions(opts); + std::unique_lock lock(mutex_); +#ifdef ARROW_WITH_OPENTELEMETRY struct Wrapstruct { void operator()(const FutureImpl& impl) { bool trace_valid = activeSpan->GetContext().IsValid(); @@ -253,8 +255,10 @@ class ConcreteFutureImpl : public FutureImpl { }; Wrapstruct wrapper{std::forward(callback), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; - std::unique_lock lock(mutex_); CallbackRecord callback_record{std::move(wrapper), opts}; +#else + CallbackRecord callback_record{std::move(callback), opts}; +#endif if (IsFutureFinished(state_)) { lock.unlock(); RunOrScheduleCallback(shared_from_this(), std::move(callback_record), From 287dfe77ef07df082767d997f0021f305b9fed95 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 21 Mar 2022 14:55:38 +0100 Subject: [PATCH 21/42] Code formatting --- cpp/src/arrow/util/future.cc | 6 ++++-- cpp/src/arrow/util/tracing_internal.h | 5 ++--- cpp/src/parquet/arrow/reader.cc | 18 ++++++++---------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index bde1f180cf8..a97cf0563cd 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -245,7 +245,7 @@ class ConcreteFutureImpl : public FutureImpl { std::unique_lock lock(mutex_); #ifdef ARROW_WITH_OPENTELEMETRY struct Wrapstruct { - void operator()(const FutureImpl& impl) { + void operator()(const FutureImpl& impl) { bool trace_valid = activeSpan->GetContext().IsValid(); auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); std::move(func)(impl); @@ -253,7 +253,9 @@ class ConcreteFutureImpl : public FutureImpl { Callback func; opentelemetry::nostd::shared_ptr activeSpan; }; - Wrapstruct wrapper{std::forward(callback), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + Wrapstruct wrapper; + wrapper.func = std::forward(callback); + wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); CallbackRecord callback_record{std::move(wrapper), opts}; #else diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index f812a4adf6d..6fc393e6f69 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -217,11 +217,10 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) #define TIE_SPAN_TO_GENERATOR(generator) \ - generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) + generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ - generator = \ - ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) + generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) #else diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5351edff8a9..b4c3efe622b 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -68,7 +68,6 @@ using arrow::TimestampArray; using arrow::internal::checked_cast; using arrow::internal::Iota; - // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; @@ -274,14 +273,14 @@ class FileReaderImpl : public FileReader { } #ifdef ARROW_WITH_OPENTELEMETRY std::string column_name = reader_->metadata()->schema()->Column(i)->name(); - std::string phys_type = TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); + std::string phys_type = + TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); ::arrow::util::tracing::Span span; - START_SPAN(span, - "parquet::arrow::read_column", - {{"parquet.arrow.columnindex", i}, - {"parquet.arrow.columnname", column_name}, - {"parquet.arrow.physicaltype", phys_type}, - {"parquet.arrow.records_to_read", records_to_read}}); + START_SPAN(span, "parquet::arrow::read_column", + {{"parquet.arrow.columnindex", i}, + {"parquet.arrow.columnname", column_name}, + {"parquet.arrow.physicaltype", phys_type}, + {"parquet.arrow.records_to_read", records_to_read}}); #endif return reader->NextBatch(records_to_read, out); END_PARQUET_CATCH_EXCEPTIONS @@ -1189,8 +1188,7 @@ Future> FileReaderImpl::DecodeRowGroups( if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); GET_CURRENT_SPAN(span); - auto read_column = [=]( - size_t i, std::shared_ptr reader) mutable + auto read_column = [=](size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { SET_SPAN_SCOPE(scope, span); std::shared_ptr<::arrow::ChunkedArray> column; From e3684b95999e7d9fac69b2bc231e4b29aeeb6838 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 21 Mar 2022 16:04:17 +0100 Subject: [PATCH 22/42] Removed unused line --- cpp/src/arrow/util/future.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index a97cf0563cd..16d9bd58761 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -246,7 +246,6 @@ class ConcreteFutureImpl : public FutureImpl { #ifdef ARROW_WITH_OPENTELEMETRY struct Wrapstruct { void operator()(const FutureImpl& impl) { - bool trace_valid = activeSpan->GetContext().IsValid(); auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); std::move(func)(impl); } From 2a52f65b0425add2368b75d4a899266d2cca984d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 22 Mar 2022 12:33:45 +0100 Subject: [PATCH 23/42] Changes otel macros to work similar to ARROW_ASSIGN... --- cpp/src/arrow/util/tracing_internal.h | 12 ++++++------ cpp/src/parquet/arrow/reader.cc | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 6fc393e6f69..873155f12af 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -210,11 +210,11 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) -#define GET_CURRENT_SPAN(span) \ - auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() +#define GET_CURRENT_SPAN(lhs) \ + lhs = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() -#define SET_SPAN_SCOPE(scope, span) \ - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) +#define SET_SPAN_SCOPE(lhs, span) \ + lhs = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) #define TIE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) @@ -232,8 +232,8 @@ class SpanImpl {}; #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) -#define GET_CURRENT_SPAN(span) -#define SET_SPAN_SCOPE(scope, span) +#define GET_CURRENT_SPAN(lhs) +#define SET_SPAN_SCOPE(lhs, span) #define TIE_SPAN_TO_GENERATOR(generator) #define PROPAGATE_SPAN_TO_GENERATOR(generator) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index b4c3efe622b..48e88bcd152 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1187,10 +1187,10 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); - GET_CURRENT_SPAN(span); + GET_CURRENT_SPAN(auto span); auto read_column = [=](size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { - SET_SPAN_SCOPE(scope, span); + SET_SPAN_SCOPE(auto scope, span); std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); return column; From fef837e7bfb8a72f9ca152f2648329908ac6d958 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 22 Mar 2022 12:34:08 +0100 Subject: [PATCH 24/42] Attempt at forward otel spans through Executor::Submit (not working yet) --- cpp/src/arrow/util/thread_pool.h | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index a104e0e3590..de909795e2d 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -39,6 +39,9 @@ #include "arrow/util/macros.h" #include "arrow/util/visibility.h" +#include +#include + #if defined(_MSC_VER) // Disable harmless warning for decorated name length limit #pragma warning(disable : 4503) @@ -78,6 +81,11 @@ struct TaskHints { int64_t external_id = -1; }; +// Forward declare to prevent including tracing_internal.h +namespace tracing { + opentelemetry::trace::Tracer* GetTracer(); +} + class ARROW_EXPORT Executor { public: using StopCallback = internal::FnOnce; @@ -149,10 +157,32 @@ class ARROW_EXPORT Executor { Result Submit(TaskHints hints, StopToken stop_token, Function&& func, Args&&... args) { using ValueType = typename FutureType::ValueType; - auto future = FutureType::Make(); + +#ifdef ARROW_WITH_OPENTELEMETRY + struct { + void operator()(Args&&... args) { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(args...); + } + Function func; + opentelemetry::nostd::shared_ptr activeSpan; + } wrapper{std::forward(func), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + auto task = std::bind(::arrow::detail::ContinueFuture{}, future, + std::forward(wrapper), std::forward(args)...); +#else auto task = std::bind(::arrow::detail::ContinueFuture{}, future, std::forward(func), std::forward(args)...); +#endif + +// auto activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); +// auto wrapper_func = [=] (Args&&... args) mutable { +// auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); +// std::move(func)(std::forward(args)...); +// }; +// +// auto task = std::bind(::arrow::detail::ContinueFuture{}, future, +// std::forward(wrapper_func), std::forward(args)...); struct { WeakFuture weak_fut; From e4f0fdee7c787d6cb89f049b879604a11f40c431 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 28 Mar 2022 17:50:35 +0200 Subject: [PATCH 25/42] Using std::function to wrap the struct (now templated with Func return type) --- cpp/src/arrow/util/thread_pool.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index de909795e2d..786c323880d 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -152,6 +152,7 @@ class ARROW_EXPORT Executor { // will return the callable's result value once. // The callable's arguments are copied before execution. template , typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature< Function && (Args && ...)>> Result Submit(TaskHints hints, StopToken stop_token, Function&& func, @@ -161,15 +162,15 @@ class ARROW_EXPORT Executor { #ifdef ARROW_WITH_OPENTELEMETRY struct { - void operator()(Args&&... args) { + FuncResult operator()(Args&&... args) { auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - std::move(func)(args...); + return std::move(func)(args...); } Function func; opentelemetry::nostd::shared_ptr activeSpan; } wrapper{std::forward(func), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; auto task = std::bind(::arrow::detail::ContinueFuture{}, future, - std::forward(wrapper), std::forward(args)...); + std::function(wrapper), std::forward(args)...); #else auto task = std::bind(::arrow::detail::ContinueFuture{}, future, std::forward(func), std::forward(args)...); From 4573548b6001721a995ee2b1f51f6e8ba6878957 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 10:11:32 +0200 Subject: [PATCH 26/42] Moved Executor task wrapping down into SpawnReal --- cpp/src/arrow/util/thread_pool.cc | 28 ++++++++++++++++++++++++++++ cpp/src/arrow/util/thread_pool.h | 31 ------------------------------- cpp/src/parquet/arrow/reader.cc | 2 -- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index a1387947e3a..1b8a6b2de81 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -30,6 +30,8 @@ #include "arrow/util/logging.h" #include "arrow/util/mutex.h" +#include "arrow/util/tracing_internal.h" + namespace arrow { namespace internal { @@ -58,6 +60,19 @@ SerialExecutor::~SerialExecutor() = default; Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { +#ifdef ARROW_WITH_OPENTELEMETRY + //Wrap the task to propagate a parent tracing span to it + struct { + void operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(); + } + FnOnce func; + opentelemetry::nostd::shared_ptr activeSpan; + } wrapper{std::forward>(task), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + task = std::move(wrapper); +#endif // While the SerialExecutor runs tasks synchronously on its main thread, // SpawnReal may be called from external threads (e.g. when transferring back // from blocking I/O threads), so we need to keep the state alive *and* to @@ -366,6 +381,19 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto // We can still spin up more workers so spin up a new worker LaunchWorkersUnlocked(/*threads=*/1); } +#ifdef ARROW_WITH_OPENTELEMETRY + //Wrap the task to propagate a parent tracing span to it + struct { + void operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(); + } + FnOnce func; + opentelemetry::nostd::shared_ptr activeSpan; + } wrapper{std::forward>(task), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + task = std::move(wrapper); +#endif state_->pending_tasks_.push_back( {std::move(task), std::move(stop_token), std::move(stop_callback)}); } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 786c323880d..ea3cb57e463 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -39,9 +39,6 @@ #include "arrow/util/macros.h" #include "arrow/util/visibility.h" -#include -#include - #if defined(_MSC_VER) // Disable harmless warning for decorated name length limit #pragma warning(disable : 4503) @@ -81,11 +78,6 @@ struct TaskHints { int64_t external_id = -1; }; -// Forward declare to prevent including tracing_internal.h -namespace tracing { - opentelemetry::trace::Tracer* GetTracer(); -} - class ARROW_EXPORT Executor { public: using StopCallback = internal::FnOnce; @@ -159,31 +151,8 @@ class ARROW_EXPORT Executor { Args&&... args) { using ValueType = typename FutureType::ValueType; auto future = FutureType::Make(); - -#ifdef ARROW_WITH_OPENTELEMETRY - struct { - FuncResult operator()(Args&&... args) { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - return std::move(func)(args...); - } - Function func; - opentelemetry::nostd::shared_ptr activeSpan; - } wrapper{std::forward(func), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; - auto task = std::bind(::arrow::detail::ContinueFuture{}, future, - std::function(wrapper), std::forward(args)...); -#else auto task = std::bind(::arrow::detail::ContinueFuture{}, future, std::forward(func), std::forward(args)...); -#endif - -// auto activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); -// auto wrapper_func = [=] (Args&&... args) mutable { -// auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); -// std::move(func)(std::forward(args)...); -// }; -// -// auto task = std::bind(::arrow::detail::ContinueFuture{}, future, -// std::forward(wrapper_func), std::forward(args)...); struct { WeakFuture weak_fut; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 48e88bcd152..a9cebd76112 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1187,10 +1187,8 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); - GET_CURRENT_SPAN(auto span); auto read_column = [=](size_t i, std::shared_ptr reader) mutable -> ::arrow::Result> { - SET_SPAN_SCOPE(auto scope, span); std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); return column; From 61ffaae69aa832950181c9f03dc58a275fa0b43d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 10:26:41 +0200 Subject: [PATCH 27/42] Formatting --- cpp/src/arrow/util/thread_pool.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 1b8a6b2de81..af9b38ffd4f 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -61,14 +61,14 @@ SerialExecutor::~SerialExecutor() = default; Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { #ifdef ARROW_WITH_OPENTELEMETRY - //Wrap the task to propagate a parent tracing span to it + // Wrap the task to propagate a parent tracing span to it struct { - void operator()() { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); - std::move(func)(); - } - FnOnce func; - opentelemetry::nostd::shared_ptr activeSpan; + void operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(); + } + FnOnce func; + opentelemetry::nostd::shared_ptr activeSpan; } wrapper{std::forward>(task), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; task = std::move(wrapper); @@ -382,7 +382,7 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto LaunchWorkersUnlocked(/*threads=*/1); } #ifdef ARROW_WITH_OPENTELEMETRY - //Wrap the task to propagate a parent tracing span to it + // Wrap the task to propagate a parent tracing span to it struct { void operator()() { auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); From 61c7af52906f0841283f567d67ec4e87f82e28db Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 14:47:54 +0200 Subject: [PATCH 28/42] Addressed review comments --- cpp/src/arrow/util/future.cc | 17 +++++++---------- cpp/src/arrow/util/thread_pool.cc | 11 ++++++----- cpp/src/arrow/util/thread_pool.h | 2 +- cpp/src/parquet/arrow/reader.cc | 6 +++--- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index 16d9bd58761..ca4290c5b08 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -244,22 +244,19 @@ class ConcreteFutureImpl : public FutureImpl { CheckOptions(opts); std::unique_lock lock(mutex_); #ifdef ARROW_WITH_OPENTELEMETRY - struct Wrapstruct { + struct SpanWrapper { void operator()(const FutureImpl& impl) { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); std::move(func)(impl); } Callback func; - opentelemetry::nostd::shared_ptr activeSpan; + opentelemetry::nostd::shared_ptr active_span; }; - Wrapstruct wrapper; - wrapper.func = std::forward(callback); - wrapper.activeSpan = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); - - CallbackRecord callback_record{std::move(wrapper), opts}; -#else - CallbackRecord callback_record{std::move(callback), opts}; + SpanWrapper wrapper{std::move(callback), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + callback = std::move(wrapper); #endif + CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); RunOrScheduleCallback(shared_from_this(), std::move(callback_record), diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index af9b38ffd4f..8a27e171943 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -62,15 +62,16 @@ Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it - struct { + struct SpanWrapper { void operator()() { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); std::move(func)(); } FnOnce func; - opentelemetry::nostd::shared_ptr activeSpan; - } wrapper{std::forward>(task), - ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + opentelemetry::nostd::shared_ptr active_span; + }; + SpanWrapper wrapper{std::move(task), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; task = std::move(wrapper); #endif // While the SerialExecutor runs tasks synchronously on its main thread, diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index ea3cb57e463..a104e0e3590 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -144,12 +144,12 @@ class ARROW_EXPORT Executor { // will return the callable's result value once. // The callable's arguments are copied before execution. template , typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature< Function && (Args && ...)>> Result Submit(TaskHints hints, StopToken stop_token, Function&& func, Args&&... args) { using ValueType = typename FutureType::ValueType; + auto future = FutureType::Make(); auto task = std::bind(::arrow::detail::ContinueFuture{}, future, std::forward(func), std::forward(args)...); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index a9cebd76112..83a8fe70950 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1072,8 +1072,7 @@ class RowGroupGenerator { } auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices); if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); - - return ready.Then([=]() mutable -> ::arrow::Future { + return ready.Then([=]() -> ::arrow::Future { return ReadOneRowGroup(cpu_executor_, reader, row_group, column_indices); }); } @@ -1187,7 +1186,8 @@ Future> FileReaderImpl::DecodeRowGroups( // OptionalParallelForAsync requires an executor if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool(); - auto read_column = [=](size_t i, std::shared_ptr reader) mutable + auto read_column = [row_groups, self, this](size_t i, + std::shared_ptr reader) -> ::arrow::Result> { std::shared_ptr<::arrow::ChunkedArray> column; RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); From d3cd60b6492ed57c24a69016218413b76b168773 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 14:52:05 +0200 Subject: [PATCH 29/42] Reverted some unneeded changes --- cpp/src/arrow/dataset/file_parquet.cc | 3 ++- cpp/src/parquet/arrow/reader.cc | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 327dfcdd1fd..22e8c0db23d 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -368,7 +368,8 @@ Future> ParquetFileFormat::GetReader &arrow_reader)); return std::move(arrow_reader); }, - [=](const Status& status) -> Result> { + [path]( + const Status& status) -> Result> { return WrapSourceError(status, path); }); } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 83a8fe70950..4ab20db556c 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1193,7 +1193,8 @@ Future> FileReaderImpl::DecodeRowGroups( RETURN_NOT_OK(ReadColumn(static_cast(i), row_groups, reader.get(), &column)); return column; }; - auto make_table = [=](const ::arrow::ChunkedArrayVector& columns) + auto make_table = [result_schema, row_groups, self, + this](const ::arrow::ChunkedArrayVector& columns) -> ::arrow::Result> { int64_t num_rows = 0; if (!columns.empty()) { From b8f6c2fa3232c9abdfeee47370330cf7e13ad371 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 15:04:25 +0200 Subject: [PATCH 30/42] Added another guarded macro --- cpp/src/arrow/dataset/file_parquet.cc | 5 +---- cpp/src/arrow/util/tracing_internal.h | 4 ++++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 22e8c0db23d..5e313b9ad62 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -418,10 +418,7 @@ Result ParquetFileFormat::ScanBatchesAsync( }; auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) .Then(std::move(make_generator))); -#ifdef ARROW_WITH_OPENTELEMETRY - generator = arrow::internal::tracing::WrapAsyncGenerator( - std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); -#endif + WRAP_ASYNC_GENERATOR(generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 873155f12af..153f7b2a799 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -222,6 +222,9 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) +#define WRAP_ASYNC_GENERATOR(generator, name) \ + generator = ::arrow::internal::tracing::WrapAsyncGenerator(generator, name) + #else class SpanImpl {}; @@ -236,6 +239,7 @@ class SpanImpl {}; #define SET_SPAN_SCOPE(lhs, span) #define TIE_SPAN_TO_GENERATOR(generator) #define PROPAGATE_SPAN_TO_GENERATOR(generator) +#define WRAP_ASYNC_GENERATOR(generator, name) #endif From 78e528f81b79f925438a45e4b5d7a4af8166ad19 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 15:09:18 +0200 Subject: [PATCH 31/42] Formatting --- cpp/src/arrow/dataset/file_parquet.cc | 5 +++-- cpp/src/parquet/arrow/reader.cc | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 5e313b9ad62..a3f617c28d0 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -369,7 +369,7 @@ Future> ParquetFileFormat::GetReader return std::move(arrow_reader); }, [path]( - const Status& status) -> Result> { + const Status& status) -> Result> { return WrapSourceError(status, path); }); } @@ -418,7 +418,8 @@ Result ParquetFileFormat::ScanBatchesAsync( }; auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) .Then(std::move(make_generator))); - WRAP_ASYNC_GENERATOR(generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); + WRAP_ASYNC_GENERATOR(generator, + "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 4ab20db556c..40962baf04a 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1194,7 +1194,7 @@ Future> FileReaderImpl::DecodeRowGroups( return column; }; auto make_table = [result_schema, row_groups, self, - this](const ::arrow::ChunkedArrayVector& columns) + this](const ::arrow::ChunkedArrayVector& columns) -> ::arrow::Result> { int64_t num_rows = 0; if (!columns.empty()) { From 42798cccbb08831bd69a09c847f1749b82daaafd Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 15:24:19 +0200 Subject: [PATCH 32/42] Moving the generators when wrapping them --- cpp/src/arrow/util/tracing_internal.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 153f7b2a799..cfd36a1c90c 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -217,13 +217,13 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( lhs = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) #define TIE_SPAN_TO_GENERATOR(generator) \ - generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) + generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(generator)) #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ - generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) + generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(std::move(generator)) #define WRAP_ASYNC_GENERATOR(generator, name) \ - generator = ::arrow::internal::tracing::WrapAsyncGenerator(generator, name) + generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name) #else From 9abf327b3c6ecb313fbe4e5cbca8b1bc9252fcac Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 30 Mar 2022 15:29:06 +0200 Subject: [PATCH 33/42] Formatting --- cpp/src/arrow/util/tracing_internal.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index cfd36a1c90c..16898cccf09 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -219,8 +219,9 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #define TIE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(generator)) -#define PROPAGATE_SPAN_TO_GENERATOR(generator) \ - generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(std::move(generator)) +#define PROPAGATE_SPAN_TO_GENERATOR(generator) \ + generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ + std::move(generator)) #define WRAP_ASYNC_GENERATOR(generator, name) \ generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name) From b1d46895552e073825e9e93eba5cf22415503314 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 15:09:56 +0200 Subject: [PATCH 34/42] Manually setting the current span as parent is not needed --- cpp/src/arrow/dataset/file_ipc.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 79f2eafae05..3e82e3ff8cd 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -171,13 +171,8 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::trace::StartSpanOptions span_options; - span_options.parent = parent_span->GetContext(); - generator = arrow::internal::tracing::WrapAsyncGenerator( - std::move(generator), std::move(span_options), - "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); -#endif + WRAP_ASYNC_GENERATOR(generator, + "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); }; From f32fda8b086843444fb6d0afac1f10f760511e56 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 15:36:30 +0200 Subject: [PATCH 35/42] Removed some helper functions that were too similar in nature --- cpp/src/arrow/util/tracing_internal.h | 53 ++++++--------------------- 1 file changed, 11 insertions(+), 42 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 16898cccf09..a8e88fde639 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -62,37 +62,17 @@ inline Result MarkSpan(Result result, opentelemetry::trace::Span* span) { return result; } -template -Iterator WrapIterator( - Iterator wrapped, - opentelemetry::nostd::shared_ptr parent_span, - const std::string& span_name) { - struct { - Result operator()() { - opentelemetry::trace::StartSpanOptions options; - options.parent = parent_span->GetContext(); - auto span = GetTracer()->StartSpan(span_name, options); - auto scope = GetTracer()->WithActiveSpan(span); - return wrapped.Next(); - } - - Iterator wrapped; - opentelemetry::nostd::shared_ptr parent_span; - std::string span_name; - } Wrapper; - Wrapper.wrapped = std::move(wrapped); - Wrapper.parent_span = std::move(parent_span); - Wrapper.span_name = span_name; - return MakeFunctionIterator(std::move(Wrapper)); -} - +/// \brief Start a new span for each invocation of a generator. +/// +/// The parent span of the new span will be the currently active span +/// (if any) as of when WrapAsyncGenerator was itself called. template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, - opentelemetry::trace::StartSpanOptions options, const std::string& span_name) { + auto active_span = GetTracer()->GetCurrentSpan(); return [=]() mutable -> Future { - auto span = GetTracer()->StartSpan(span_name, {}, options); - auto scope = GetTracer()->WithActiveSpan(span); + auto scope = GetTracer()->WithActiveSpan(active_span); + auto span = GetTracer()->StartSpan(span_name); auto fut = wrapped(); fut.AddCallback([span](const Result& result) { MarkSpan(result.status(), span.get()); @@ -102,18 +82,6 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, }; } -/// \brief Start a new span for each invocation of a generator. -/// -/// The parent span of the new span will be the currently active span -/// (if any) as of when WrapAsyncGenerator was itself called. -template -AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, - const std::string& span_name) { - opentelemetry::trace::StartSpanOptions options; - options.parent = GetTracer()->GetCurrentSpan()->GetContext(); - return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name); -} - /// \brief End the given span when the given async generator ends. /// /// The span will be made the active span each time the generator is called. @@ -137,16 +105,17 @@ AsyncGenerator TieSpanToAsyncGenerator( }; } -/// \brief End the given span when the given async generator ends. +/// \brief End the current span when the given async generator ends. /// /// The span will be made the active span each time the generator is called. template AsyncGenerator TieSpanToAsyncGenerator(AsyncGenerator wrapped) { auto span = GetTracer()->GetCurrentSpan(); + if (!span->GetContext().IsValid()) return wrapped; return TieSpanToAsyncGenerator(wrapped, span); } -/// \brief Activate the given span on each invocation of an async generator. +/// \brief Propagate the given span to each invocation of an async generator. template AsyncGenerator PropagateSpanThroughAsyncGenerator( AsyncGenerator wrapped, @@ -157,7 +126,7 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator( }; } -/// \brief Activate the given span on each invocation of an async generator. +/// \brief Propagate the currently active span to each invocation of an async generator. template AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) { auto span = GetTracer()->GetCurrentSpan(); From 71f6989103731be357656f2f9f028130f483adb4 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 16:30:36 +0200 Subject: [PATCH 36/42] Merged similar tracing functions regarding generators, updated/added some comments --- cpp/src/arrow/dataset/file_csv.cc | 5 +- cpp/src/arrow/dataset/file_ipc.cc | 2 +- cpp/src/arrow/dataset/file_parquet.cc | 2 +- cpp/src/arrow/dataset/scanner.cc | 2 +- cpp/src/arrow/util/tracing_internal.h | 72 ++++++++------------------- cpp/src/parquet/arrow/reader.cc | 2 +- 6 files changed, 27 insertions(+), 58 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index ad47529529f..adef6487df9 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -266,10 +266,7 @@ Result CsvFileFormat::ScanBatchesAsync( auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); -#ifdef ARROW_WITH_OPENTELEMETRY - generator = arrow::internal::tracing::WrapAsyncGenerator( - std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); -#endif + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 3e82e3ff8cd..de400be893d 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -171,7 +171,7 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } - WRAP_ASYNC_GENERATOR(generator, + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index a3f617c28d0..4a36e318ec4 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -418,7 +418,7 @@ Result ParquetFileFormat::ScanBatchesAsync( }; auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) .Then(std::move(make_generator))); - WRAP_ASYNC_GENERATOR(generator, + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 62af326d3dc..ab8026d5ab2 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -228,7 +228,7 @@ Result FragmentToBatches( MakeArrayOfNull(field->type(), /*length=*/0, options->pool)); columns.push_back(std::move(array)); } - TIE_SPAN_TO_GENERATOR(batch_gen); + WRAP_ASYNC_GENERATOR(batch_gen); batch_gen = MakeDefaultIfEmptyGenerator( std::move(batch_gen), RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index a8e88fde639..6c549f43807 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -68,53 +68,23 @@ inline Result MarkSpan(Result result, opentelemetry::trace::Span* span) { /// (if any) as of when WrapAsyncGenerator was itself called. template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, - const std::string& span_name) { + const std::string& span_name = "", bool create_childspan = false) { auto active_span = GetTracer()->GetCurrentSpan(); return [=]() mutable -> Future { + auto span = active_span; auto scope = GetTracer()->WithActiveSpan(active_span); - auto span = GetTracer()->StartSpan(span_name); auto fut = wrapped(); - fut.AddCallback([span](const Result& result) { - MarkSpan(result.status(), span.get()); - span->End(); - }); - return fut; - }; -} - -/// \brief End the given span when the given async generator ends. -/// -/// The span will be made the active span each time the generator is called. -template -AsyncGenerator TieSpanToAsyncGenerator( - AsyncGenerator wrapped, - opentelemetry::nostd::shared_ptr span) { - return [=]() mutable -> Future { - auto scope = GetTracer()->WithActiveSpan(span); - return wrapped().Then( - [span](const T& result) -> Result { - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - span->End(); - return result; - }, - [span](const Status& status) -> Result { - MarkSpan(status, span.get()); + if (create_childspan) { + span = GetTracer()->StartSpan(span_name); + } + fut.AddCallback([span](const Result &result) { + MarkSpan(result.status(), span.get()); span->End(); - return status; - }); + }); + return fut; }; } -/// \brief End the current span when the given async generator ends. -/// -/// The span will be made the active span each time the generator is called. -template -AsyncGenerator TieSpanToAsyncGenerator(AsyncGenerator wrapped) { - auto span = GetTracer()->GetCurrentSpan(); - if (!span->GetContext().IsValid()) return wrapped; - return TieSpanToAsyncGenerator(wrapped, span); -} - /// \brief Propagate the given span to each invocation of an async generator. template AsyncGenerator PropagateSpanThroughAsyncGenerator( @@ -134,6 +104,14 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } +/* + * PropagateSpanThroughAsyncGenerator - only makes sure the trace context is propagated, + * so that spans created when running generator instances asynchronously do not + * end up in a separate, disconnected trace. + * WrapAsyncGenerator - Connect the current span to the generator. When the generator + * finishes, it will stop the span. Optionally, create child spans at each invocation. + */ + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; @@ -179,21 +157,15 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) -#define GET_CURRENT_SPAN(lhs) \ - lhs = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() - -#define SET_SPAN_SCOPE(lhs, span) \ - lhs = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) - -#define TIE_SPAN_TO_GENERATOR(generator) \ - generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(generator)) - #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ std::move(generator)) -#define WRAP_ASYNC_GENERATOR(generator, name) \ - generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name) +#define WRAP_ASYNC_GENERATOR(generator) \ + generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator)) + +#define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) \ + generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) #else diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 40962baf04a..bc6754703d6 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1137,7 +1137,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator), row_group_readahead); } - TIE_SPAN_TO_GENERATOR(std::move(row_group_generator)); + WRAP_ASYNC_GENERATOR(std::move(row_group_generator)); return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); } From 4a82a7a12c6bbc344e96c1e0415c1bd605f11544 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 16:32:41 +0200 Subject: [PATCH 37/42] Addressed review comment --- cpp/src/arrow/dataset/file_csv.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index adef6487df9..c5364cae8f7 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -175,7 +175,7 @@ static inline Future> OpenReaderAsync( ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); - auto path = source.path(); + const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); From 90e276808dbd7941596e29b80f6af284f2bf3a1b Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 16:37:32 +0200 Subject: [PATCH 38/42] Formatting --- cpp/src/arrow/dataset/file_csv.cc | 3 ++- cpp/src/arrow/dataset/file_ipc.cc | 4 ++-- cpp/src/arrow/dataset/file_parquet.cc | 4 ++-- cpp/src/arrow/util/tracing_internal.h | 14 ++++++++------ 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index c5364cae8f7..277bab29a09 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -266,7 +266,8 @@ Result CsvFileFormat::ScanBatchesAsync( auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); - WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index de400be893d..260dd6278ce 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -171,8 +171,8 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } - WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, - "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 4a36e318ec4..4a8d4093124 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -418,8 +418,8 @@ Result ParquetFileFormat::ScanBatchesAsync( }; auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) .Then(std::move(make_generator))); - WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, - "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 6c549f43807..33280b95514 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -68,7 +68,8 @@ inline Result MarkSpan(Result result, opentelemetry::trace::Span* span) { /// (if any) as of when WrapAsyncGenerator was itself called. template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, - const std::string& span_name = "", bool create_childspan = false) { + const std::string& span_name = "", + bool create_childspan = false) { auto active_span = GetTracer()->GetCurrentSpan(); return [=]() mutable -> Future { auto span = active_span; @@ -77,10 +78,10 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, if (create_childspan) { span = GetTracer()->StartSpan(span_name); } - fut.AddCallback([span](const Result &result) { - MarkSpan(result.status(), span.get()); - span->End(); - }); + fut.AddCallback([span](const Result& result) { + MarkSpan(result.status(), span.get()); + span->End(); + }); return fut; }; } @@ -165,7 +166,8 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator)) #define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) \ - generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) + generator = \ + ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) #else From 3e24f549ab54ba9088c9e7f911a846439ad58a4c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 1 Apr 2022 16:50:09 +0200 Subject: [PATCH 39/42] Updated macros for when opentelemetry is disabled --- cpp/src/arrow/util/tracing_internal.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 33280b95514..43a8da0f2f5 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -179,11 +179,9 @@ class SpanImpl {}; #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) -#define GET_CURRENT_SPAN(lhs) -#define SET_SPAN_SCOPE(lhs, span) -#define TIE_SPAN_TO_GENERATOR(generator) #define PROPAGATE_SPAN_TO_GENERATOR(generator) -#define WRAP_ASYNC_GENERATOR(generator, name) +#define WRAP_ASYNC_GENERATOR(generator) +#define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) #endif From 7330f1414a2a31ca0ccf7a076bc4b92644eda61d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 4 Apr 2022 17:24:16 +0200 Subject: [PATCH 40/42] Removed some left-over code --- cpp/src/arrow/dataset/file_ipc.cc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 260dd6278ce..7c45a5d7056 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -137,10 +137,6 @@ Result> IpcFileFormat::Inspect(const FileSource& source) Result IpcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { -#ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = arrow::internal::tracing::GetTracer(); - auto parent_span = tracer->GetCurrentSpan(); -#endif auto self = shared_from_this(); auto source = file->source(); auto open_reader = OpenReaderAsync(source); From df2a6be2781a1a415547d99c0a2449dc9cb23119 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 4 Apr 2022 17:24:39 +0200 Subject: [PATCH 41/42] Small updates to otel helper function documentation --- cpp/src/arrow/util/tracing_internal.h | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 43a8da0f2f5..f943c03e995 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -62,10 +62,8 @@ inline Result MarkSpan(Result result, opentelemetry::trace::Span* span) { return result; } -/// \brief Start a new span for each invocation of a generator. -/// -/// The parent span of the new span will be the currently active span -/// (if any) as of when WrapAsyncGenerator was itself called. +/// \brief Tie the current span to a generator, ending it when the generator finishes. +/// Optionally start a child span for each invocation. template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, const std::string& span_name = "", @@ -98,6 +96,9 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator( } /// \brief Propagate the currently active span to each invocation of an async generator. +/// +/// This prevents spans, created when running generator instances asynchronously, +/// ending up in a separate, disconnected trace. template AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) { auto span = GetTracer()->GetCurrentSpan(); @@ -105,14 +106,6 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } -/* - * PropagateSpanThroughAsyncGenerator - only makes sure the trace context is propagated, - * so that spans created when running generator instances asynchronously do not - * end up in a separate, disconnected trace. - * WrapAsyncGenerator - Connect the current span to the generator. When the generator - * finishes, it will stop the span. Optionally, create child spans at each invocation. - */ - class SpanImpl { public: opentelemetry::nostd::shared_ptr span; From 567fa3004dfad105c6d9af1929bf231052f167f9 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 4 Apr 2022 17:35:10 +0200 Subject: [PATCH 42/42] Adding a comment about the use of macros for helper functions --- cpp/src/arrow/util/tracing_internal.h | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index f943c03e995..77320eb2aec 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -162,7 +162,16 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( generator = \ ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) -#else +/* + * Calls to the helper macros above are removed by the preprocessor when building + * without opentelemetry, because of the empty definitions below. + * Without them, every call to a helper function would need to be surrounded with + * #ifdef ARROW_WITH_OPENTELEMETRY + * ... + * #endif + */ + +#else // !ARROW_WITH_OPENTELEMETRY class SpanImpl {};