diff --git a/cpp/build-support/lsan-suppressions.txt b/cpp/build-support/lsan-suppressions.txt index 566857a9c09..a8918e10d94 100644 --- a/cpp/build-support/lsan-suppressions.txt +++ b/cpp/build-support/lsan-suppressions.txt @@ -19,3 +19,10 @@ leak:*__new_exitfn* # Leak at shutdown in OpenSSL leak:CRYPTO_zalloc + +# OpenTelemetry. These seem like false positives and go away if the +# CPU thread pool is manually shut down before exit. +# Note that ASan has trouble backtracing these and may not be able to +# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100 +leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack +leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 1cc7957083f..277bab29a09 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -39,6 +39,7 @@ #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/utf8.h" namespace arrow { @@ -167,9 +168,14 @@ static inline Result GetReadOptions( static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options, Executor* cpu_executor) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); @@ -190,11 +196,20 @@ static inline Future> OpenReaderAsync( })); return reader_fut.Then( // Adds the filename to the error - [](const std::shared_ptr& reader) - -> Result> { return reader; }, - [source](const Status& err) -> Result> { - return err.WithMessage("Could not open CSV input source '", source.path(), - "': ", err); + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& err) -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(err, span.get()); + span->End(); +#endif + return err.WithMessage("Could not open CSV input source '", path, "': ", err); }); } @@ -250,7 +265,10 @@ Result CsvFileFormat::ScanBatchesAsync( auto source = file->source(); auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); - return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); + auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); + return generator; } Future> CsvFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e386c7dea8d..7c45a5d7056 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -30,6 +30,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -62,16 +63,31 @@ static inline Result> OpenReader( static inline Future> OpenReaderAsync( const FileSource& source, const ipc::IpcReadOptions& options = default_read_options()) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync"); +#endif ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); auto path = source.path(); return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options) - .Then([](const std::shared_ptr& reader) - -> Result> { return reader; }, - [path](const Status& status) - -> Result> { - return status.WithMessage("Could not open IPC input source '", path, - "': ", status.message()); - }); + .Then( + [=](const std::shared_ptr& reader) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + span->End(); +#endif + return reader; + }, + [=](const Status& status) + -> Result> { +#ifdef ARROW_WITH_OPENTELEMETRY + arrow::internal::tracing::MarkSpan(status, span.get()); + span->End(); +#endif + return status.WithMessage("Could not open IPC input source '", path, + "': ", status.message()); + }); } static inline Result> GetIncludedFields( @@ -151,6 +167,8 @@ Result IpcFileFormat::ScanBatchesAsync( ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator( /*coalesce=*/false, options->io_context)); } + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next"); auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level); return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size); }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 5fbd457eccd..4a8d4093124 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -34,6 +34,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/range.h" +#include "arrow/util/tracing_internal.h" #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/writer.h" @@ -415,8 +416,11 @@ Result ParquetFileFormat::ScanBatchesAsync( ::arrow::internal::GetCpuThreadPool(), row_group_readahead)); return generator; }; - return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) - .Then(std::move(make_generator))); + auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options) + .Then(std::move(make_generator))); + WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( + generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next"); + return generator; } Future> ParquetFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index b958f7b9e62..ab8026d5ab2 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -35,10 +35,12 @@ #include "arrow/dataset/plan.h" #include "arrow/table.h" #include "arrow/util/async_generator.h" +#include "arrow/util/config.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -206,6 +208,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this FragmentToBatches( const Enumerated>& fragment, const std::shared_ptr& options) { +#ifdef ARROW_WITH_OPENTELEMETRY + auto tracer = arrow::internal::tracing::GetTracer(); + auto span = tracer->StartSpan( + "arrow::dataset::FragmentToBatches", + { + {"arrow.dataset.fragment", fragment.value->ToString()}, + {"arrow.dataset.fragment.index", fragment.index}, + {"arrow.dataset.fragment.last", fragment.last}, + {"arrow.dataset.fragment.type_name", fragment.value->type_name()}, + }); + auto scope = tracer->WithActiveSpan(span); +#endif ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options)); ArrayVector columns; for (const auto& field : options->dataset_schema->fields()) { @@ -214,6 +228,7 @@ Result FragmentToBatches( MakeArrayOfNull(field->type(), /*length=*/0, options->pool)); columns.push_back(std::move(array)); } + WRAP_ASYNC_GENERATOR(batch_gen); batch_gen = MakeDefaultIfEmptyGenerator( std::move(batch_gen), RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns))); @@ -230,10 +245,13 @@ Result FragmentToBatches( Result> FragmentsToBatches( FragmentGenerator fragment_gen, const std::shared_ptr& options) { auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen)); - return MakeMappedGenerator(std::move(enumerated_fragment_gen), - [=](const Enumerated>& fragment) { - return FragmentToBatches(fragment, options); - }); + auto batch_gen_gen = + MakeMappedGenerator(std::move(enumerated_fragment_gen), + [=](const Enumerated>& fragment) { + return FragmentToBatches(fragment, options); + }); + PROPAGATE_SPAN_TO_GENERATOR(std::move(batch_gen_gen)); + return batch_gen_gen; } class OneShotFragment : public Fragment { diff --git a/cpp/src/arrow/util/future.cc b/cpp/src/arrow/util/future.cc index c398d992861..ca4290c5b08 100644 --- a/cpp/src/arrow/util/future.cc +++ b/cpp/src/arrow/util/future.cc @@ -27,6 +27,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -242,6 +243,19 @@ class ConcreteFutureImpl : public FutureImpl { void AddCallback(Callback callback, CallbackOptions opts) { CheckOptions(opts); std::unique_lock lock(mutex_); +#ifdef ARROW_WITH_OPENTELEMETRY + struct SpanWrapper { + void operator()(const FutureImpl& impl) { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + std::move(func)(impl); + } + Callback func; + opentelemetry::nostd::shared_ptr active_span; + }; + SpanWrapper wrapper{std::move(callback), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + callback = std::move(wrapper); +#endif CallbackRecord callback_record{std::move(callback), opts}; if (IsFutureFinished(state_)) { lock.unlock(); diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index a1387947e3a..8a27e171943 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -30,6 +30,8 @@ #include "arrow/util/logging.h" #include "arrow/util/mutex.h" +#include "arrow/util/tracing_internal.h" + namespace arrow { namespace internal { @@ -58,6 +60,20 @@ SerialExecutor::~SerialExecutor() = default; Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce task, StopToken stop_token, StopCallback&& stop_callback) { +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct SpanWrapper { + void operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + std::move(func)(); + } + FnOnce func; + opentelemetry::nostd::shared_ptr active_span; + }; + SpanWrapper wrapper{std::move(task), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + task = std::move(wrapper); +#endif // While the SerialExecutor runs tasks synchronously on its main thread, // SpawnReal may be called from external threads (e.g. when transferring back // from blocking I/O threads), so we need to keep the state alive *and* to @@ -366,6 +382,19 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto // We can still spin up more workers so spin up a new worker LaunchWorkersUnlocked(/*threads=*/1); } +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct { + void operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); + std::move(func)(); + } + FnOnce func; + opentelemetry::nostd::shared_ptr activeSpan; + } wrapper{std::forward>(task), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + task = std::move(wrapper); +#endif state_->pending_tasks_.push_back( {std::move(task), std::move(stop_token), std::move(stop_callback)}); } diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index d4947ac88fe..77320eb2aec 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -62,37 +62,20 @@ inline Result MarkSpan(Result result, opentelemetry::trace::Span* span) { return result; } -template -Iterator WrapIterator( - Iterator wrapped, - opentelemetry::nostd::shared_ptr parent_span, - const std::string& span_name) { - struct { - Result operator()() { - opentelemetry::trace::StartSpanOptions options; - options.parent = parent_span->GetContext(); - auto span = GetTracer()->StartSpan(span_name, options); - auto scope = GetTracer()->WithActiveSpan(span); - return wrapped.Next(); - } - - Iterator wrapped; - opentelemetry::nostd::shared_ptr parent_span; - std::string span_name; - } Wrapper; - Wrapper.wrapped = std::move(wrapped); - Wrapper.parent_span = std::move(parent_span); - Wrapper.span_name = span_name; - return MakeFunctionIterator(std::move(Wrapper)); -} - +/// \brief Tie the current span to a generator, ending it when the generator finishes. +/// Optionally start a child span for each invocation. template AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, - const std::string& span_name) { + const std::string& span_name = "", + bool create_childspan = false) { + auto active_span = GetTracer()->GetCurrentSpan(); return [=]() mutable -> Future { - auto span = GetTracer()->StartSpan(span_name); - auto scope = GetTracer()->WithActiveSpan(span); + auto span = active_span; + auto scope = GetTracer()->WithActiveSpan(active_span); auto fut = wrapped(); + if (create_childspan) { + span = GetTracer()->StartSpan(span_name); + } fut.AddCallback([span](const Result& result) { MarkSpan(result.status(), span.get()); span->End(); @@ -101,6 +84,28 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, }; } +/// \brief Propagate the given span to each invocation of an async generator. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator( + AsyncGenerator wrapped, + opentelemetry::nostd::shared_ptr span) { + return [=]() mutable -> Future { + auto scope = GetTracer()->WithActiveSpan(span); + return wrapped(); + }; +} + +/// \brief Propagate the currently active span to each invocation of an async generator. +/// +/// This prevents spans, created when running generator instances asynchronously, +/// ending up in a separate, disconnected trace. +template +AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) { + auto span = GetTracer()->GetCurrentSpan(); + if (!span->GetContext().IsValid()) return wrapped; + return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); +} + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; @@ -146,7 +151,27 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) -#else +#define PROPAGATE_SPAN_TO_GENERATOR(generator) \ + generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ + std::move(generator)) + +#define WRAP_ASYNC_GENERATOR(generator) \ + generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator)) + +#define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) \ + generator = \ + ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name, true) + +/* + * Calls to the helper macros above are removed by the preprocessor when building + * without opentelemetry, because of the empty definitions below. + * Without them, every call to a helper function would need to be surrounded with + * #ifdef ARROW_WITH_OPENTELEMETRY + * ... + * #endif + */ + +#else // !ARROW_WITH_OPENTELEMETRY class SpanImpl {}; @@ -156,6 +181,9 @@ class SpanImpl {}; #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) +#define PROPAGATE_SPAN_TO_GENERATOR(generator) +#define WRAP_ASYNC_GENERATOR(generator) +#define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) #endif diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 3b0b622e925..bc6754703d6 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -38,6 +38,7 @@ #include "arrow/util/make_unique.h" #include "arrow/util/parallel.h" #include "arrow/util/range.h" +#include "arrow/util/tracing_internal.h" #include "parquet/arrow/reader_internal.h" #include "parquet/column_reader.h" #include "parquet/exception.h" @@ -270,6 +271,17 @@ class FileReaderImpl : public FileReader { records_to_read += reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values(); } +#ifdef ARROW_WITH_OPENTELEMETRY + std::string column_name = reader_->metadata()->schema()->Column(i)->name(); + std::string phys_type = + TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); + ::arrow::util::tracing::Span span; + START_SPAN(span, "parquet::arrow::read_column", + {{"parquet.arrow.columnindex", i}, + {"parquet.arrow.columnname", column_name}, + {"parquet.arrow.physicaltype", phys_type}, + {"parquet.arrow.records_to_read", records_to_read}}); +#endif return reader->NextBatch(records_to_read, out); END_PARQUET_CATCH_EXCEPTIONS } @@ -1125,6 +1137,7 @@ FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr reader, row_group_generator = ::arrow::MakeReadaheadGenerator(std::move(row_group_generator), row_group_readahead); } + WRAP_ASYNC_GENERATOR(std::move(row_group_generator)); return ::arrow::MakeConcatenatedGenerator(std::move(row_group_generator)); } diff --git a/cpp/valgrind.supp b/cpp/valgrind.supp index 8d2d5da904b..2b9959136a8 100644 --- a/cpp/valgrind.supp +++ b/cpp/valgrind.supp @@ -51,3 +51,21 @@ ... fun:*re2*Prog* } +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*GetStack* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize* +} +{ + :Thread locals don't appear to be freed + Memcheck:Leak + ... + fun:_dl_allocate_tls +}