From bcc31f87a4cdccefdd46c24eec45c177efb7b497 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 14 Dec 2021 17:34:19 -0500
Subject: [PATCH 1/5] ARROW-15067: [C++] Add tracing spans to the scanner
---
cpp/src/arrow/dataset/file_base.cc | 9 +++++-
cpp/src/arrow/dataset/file_csv.cc | 32 +++++++++++++++++----
cpp/src/arrow/dataset/file_ipc.cc | 40 ++++++++++++++++++++++-----
cpp/src/arrow/dataset/file_parquet.cc | 25 ++++++++++++++---
cpp/src/arrow/dataset/scanner.cc | 18 ++++++++++++
cpp/src/arrow/util/tracing_internal.h | 35 ++++++++++++++++++++++-
6 files changed, 140 insertions(+), 19 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc
index 2b605b338f7..94a85d6b9bc 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -41,6 +41,7 @@
#include "arrow/util/map.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
+#include "arrow/util/tracing_internal.h"
#include "arrow/util/variant.h"
namespace arrow {
@@ -159,7 +160,13 @@ Result FileFormat::ScanBatchesAsync(
}
std::shared_ptr state;
};
- return Generator{std::make_shared(scan_options, std::move(scan_task_it))};
+ RecordBatchGenerator generator =
+ Generator{std::make_shared(scan_options, std::move(scan_task_it))};
+#ifdef ARROW_WITH_OPENTELEMETRY
+ generator = arrow::internal::tracing::WrapAsyncGenerator(
+ std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next");
+#endif
+ return generator;
}
Result> FileFragment::ReadPhysicalSchemaImpl() {
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index a2816cc3e31..c73c73b14d0 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -39,6 +39,7 @@
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
+#include "arrow/util/tracing_internal.h"
#include "arrow/util/utf8.h"
namespace arrow {
@@ -148,9 +149,14 @@ static inline Result GetReadOptions(
static inline Future> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr& scan_options, Executor* cpu_executor) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
+#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
+ auto path = source.path();
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(), std::move(input)));
@@ -171,11 +177,20 @@ static inline Future> OpenReaderAsync(
}));
return reader_fut.Then(
// Adds the filename to the error
- [](const std::shared_ptr& reader)
- -> Result> { return reader; },
- [source](const Status& err) -> Result> {
- return err.WithMessage("Could not open CSV input source '", source.path(),
- "': ", err);
+ [=](const std::shared_ptr& reader)
+ -> Result> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+ span->End();
+#endif
+ return reader;
+ },
+ [=](const Status& err) -> Result> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ arrow::internal::tracing::MarkSpan(err, span.get());
+ span->End();
+#endif
+ return err.WithMessage("Could not open CSV input source '", path, "': ", err);
});
}
@@ -276,7 +291,12 @@ Result CsvFileFormat::ScanBatchesAsync(
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
- return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
+ auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
+#ifdef ARROW_WITH_OPENTELEMETRY
+ generator = arrow::internal::tracing::WrapAsyncGenerator(
+ std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
+#endif
+ return generator;
}
Future> CsvFileFormat::CountRows(
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index e01373e79c3..2dfa5ff9c5c 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -62,16 +62,31 @@ static inline Result> OpenReader(
static inline Future> OpenReaderAsync(
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync");
+#endif
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto path = source.path();
return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
- .Then([](const std::shared_ptr& reader)
- -> Result> { return reader; },
- [path](const Status& status)
- -> Result> {
- return status.WithMessage("Could not open IPC input source '", path,
- "': ", status.message());
- });
+ .Then(
+ [=](const std::shared_ptr& reader)
+ -> Result> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+ span->End();
+#endif
+ return reader;
+ },
+ [=](const Status& status)
+ -> Result> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ arrow::internal::tracing::MarkSpan(status, span.get());
+ span->End();
+#endif
+ return status.WithMessage("Could not open IPC input source '", path,
+ "': ", status.message());
+ });
}
static inline Result> GetIncludedFields(
@@ -209,6 +224,10 @@ Result IpcFileFormat::ScanFile(
Result IpcFileFormat::ScanBatchesAsync(
const std::shared_ptr& options,
const std::shared_ptr& file) const {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto parent_span = tracer->GetCurrentSpan();
+#endif
auto self = shared_from_this();
auto source = file->source();
auto open_reader = OpenReaderAsync(source);
@@ -239,6 +258,13 @@ Result IpcFileFormat::ScanBatchesAsync(
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
/*coalesce=*/false, options->io_context));
}
+#ifdef ARROW_WITH_OPENTELEMETRY
+ opentelemetry::trace::StartSpanOptions span_options;
+ span_options.parent = parent_span->GetContext();
+ generator = arrow::internal::tracing::WrapAsyncGenerator(
+ std::move(generator), std::move(span_options),
+ "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next");
+#endif
auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level);
return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size);
};
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index ba9be0c8b66..233eaf69194 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -34,6 +34,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
+#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
@@ -360,6 +361,10 @@ Result> ParquetFileFormat::GetReader
Future> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr& options) const {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync");
+#endif
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions(kParquetTypeName, options.get(),
@@ -392,10 +397,17 @@ Future> ParquetFileFormat::GetReader
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
+#ifdef ARROW_WITH_OPENTELEMETRY
+ span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+ span->End();
+#endif
return std::move(arrow_reader);
},
- [path](
- const Status& status) -> Result> {
+ [=](const Status& status) -> Result> {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ arrow::internal::tracing::MarkSpan(status, span.get());
+ span->End();
+#endif
return WrapSourceError(status, path);
});
}
@@ -498,8 +510,13 @@ Result ParquetFileFormat::ScanBatchesAsync(
::arrow::internal::GetCpuThreadPool(), row_group_readahead));
return generator;
};
- return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
- .Then(std::move(make_generator)));
+ auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
+ .Then(std::move(make_generator)));
+#ifdef ARROW_WITH_OPENTELEMETRY
+ generator = arrow::internal::tracing::WrapAsyncGenerator(
+ std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
+#endif
+ return generator;
}
Future> ParquetFileFormat::CountRows(
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 23942ec37da..e872db10ec7 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -35,10 +35,12 @@
#include "arrow/dataset/scanner_internal.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
+#include "arrow/util/config.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
+#include "arrow/util/tracing_internal.h"
namespace arrow {
@@ -453,6 +455,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this FragmentToBatches(
const Enumerated>& fragment,
const std::shared_ptr& options) {
+#ifdef ARROW_WITH_OPENTELEMETRY
+ auto tracer = arrow::internal::tracing::GetTracer();
+ auto span = tracer->StartSpan(
+ "arrow::dataset::FragmentToBatches",
+ {
+ {"arrow.dataset.fragment", fragment.value->ToString()},
+ {"arrow.dataset.fragment.index", fragment.index},
+ {"arrow.dataset.fragment.last", fragment.last},
+ {"arrow.dataset.fragment.type_name", fragment.value->type_name()},
+ });
+ auto scope = tracer->WithActiveSpan(span);
+#endif
ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options));
ArrayVector columns;
for (const auto& field : options->dataset_schema->fields()) {
@@ -461,6 +475,10 @@ Result FragmentToBatches(
MakeArrayOfNull(field->type(), /*length=*/0, options->pool));
columns.push_back(std::move(array));
}
+#ifdef ARROW_WITH_OPENTELEMETRY
+ batch_gen =
+ arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span);
+#endif
batch_gen = MakeDefaultIfEmptyGenerator(
std::move(batch_gen),
RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns)));
diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h
index f3ca5e5ce3d..67e79617cad 100644
--- a/cpp/src/arrow/util/tracing_internal.h
+++ b/cpp/src/arrow/util/tracing_internal.h
@@ -85,9 +85,10 @@ Iterator WrapIterator(
template
AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped,
+ opentelemetry::trace::StartSpanOptions options,
const std::string& span_name) {
return [=]() mutable -> Future {
- auto span = GetTracer()->StartSpan(span_name);
+ auto span = GetTracer()->StartSpan(span_name, {}, options);
auto scope = GetTracer()->WithActiveSpan(span);
auto fut = wrapped();
fut.AddCallback([span](const Result& result) {
@@ -97,6 +98,38 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped,
return fut;
};
}
+
+/// \brief Start a new span for each invocation of a generator.
+///
+/// The parent span of the new span will be the currently active span
+/// (if any) as of when WrapAsyncGenerator was itself called.
+template
+AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped,
+ const std::string& span_name) {
+ opentelemetry::trace::StartSpanOptions options;
+ options.parent = GetTracer()->GetCurrentSpan()->GetContext();
+ return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name);
+}
+
+/// \brief End the given span when the given async generator ends.
+///
+/// The span will be made the active span each time the generator is called.
+template
+AsyncGenerator TieSpanToAsyncGenerator(
+ AsyncGenerator wrapped,
+ opentelemetry::nostd::shared_ptr span) {
+ return [=]() mutable -> Future {
+ auto scope = GetTracer()->WithActiveSpan(span);
+ auto fut = wrapped();
+ fut.AddCallback([span](const Result& result) {
+ if (!result.ok() || IsIterationEnd(*result)) {
+ MarkSpan(result.status(), span.get());
+ span->End();
+ }
+ });
+ return fut;
+ };
+}
#endif
} // namespace tracing
From 751538147aa14055bf27ead45e6489e0e35bbe5c Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 15 Dec 2021 13:37:49 -0500
Subject: [PATCH 2/5] ARROW-15067: [C++] Add ASAN suppressions for OTel
---
cpp/build-support/lsan-suppressions.txt | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/cpp/build-support/lsan-suppressions.txt b/cpp/build-support/lsan-suppressions.txt
index 566857a9c09..a8918e10d94 100644
--- a/cpp/build-support/lsan-suppressions.txt
+++ b/cpp/build-support/lsan-suppressions.txt
@@ -19,3 +19,10 @@
leak:*__new_exitfn*
# Leak at shutdown in OpenSSL
leak:CRYPTO_zalloc
+
+# OpenTelemetry. These seem like false positives and go away if the
+# CPU thread pool is manually shut down before exit.
+# Note that ASan has trouble backtracing these and may not be able to
+# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100
+leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack
+leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize
From 0c6af07db2473bb2731d8138b1894db204810ee2 Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 15 Dec 2021 13:49:35 -0500
Subject: [PATCH 3/5] ARROW-15067: [C++] Add Valgrind suppressions for OTel
---
cpp/valgrind.supp | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/cpp/valgrind.supp b/cpp/valgrind.supp
index 8d2d5da904b..2b9959136a8 100644
--- a/cpp/valgrind.supp
+++ b/cpp/valgrind.supp
@@ -51,3 +51,21 @@
...
fun:*re2*Prog*
}
+{
+ :Thread locals don't appear to be freed
+ Memcheck:Leak
+ ...
+ fun:*opentelemetry*ThreadLocalContextStorage*GetStack*
+}
+{
+ :Thread locals don't appear to be freed
+ Memcheck:Leak
+ ...
+ fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize*
+}
+{
+ :Thread locals don't appear to be freed
+ Memcheck:Leak
+ ...
+ fun:_dl_allocate_tls
+}
From 27e5ec16ba7cbfef02691fb135e18779d64f794d Mon Sep 17 00:00:00 2001
From: David Li
Date: Wed, 15 Dec 2021 14:42:56 -0500
Subject: [PATCH 4/5] ARROW-15067: [C++] Ensure spans propagate through call to
scanner
---
cpp/src/arrow/dataset/file_ipc.cc | 1 +
cpp/src/arrow/dataset/scanner.cc | 14 ++++++++++----
cpp/src/arrow/util/tracing_internal.h | 19 +++++++++++++++++++
3 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index 2dfa5ff9c5c..181418c9d46 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -30,6 +30,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
+#include "arrow/util/tracing_internal.h"
namespace arrow {
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index e872db10ec7..bed23df27d5 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -495,10 +495,16 @@ Result FragmentToBatches(
Result> FragmentsToBatches(
FragmentGenerator fragment_gen, const std::shared_ptr& options) {
auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
- return MakeMappedGenerator(std::move(enumerated_fragment_gen),
- [=](const Enumerated>& fragment) {
- return FragmentToBatches(fragment, options);
- });
+ auto batch_gen_gen =
+ MakeMappedGenerator(std::move(enumerated_fragment_gen),
+ [=](const Enumerated>& fragment) {
+ return FragmentToBatches(fragment, options);
+ });
+#ifdef ARROW_WITH_OPENTELEMETRY
+ batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(
+ std::move(batch_gen_gen));
+#endif
+ return batch_gen_gen;
}
const FieldVector kAugmentedFields{
diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h
index 67e79617cad..2b75ead7324 100644
--- a/cpp/src/arrow/util/tracing_internal.h
+++ b/cpp/src/arrow/util/tracing_internal.h
@@ -130,6 +130,25 @@ AsyncGenerator TieSpanToAsyncGenerator(
return fut;
};
}
+
+/// \brief Activate the given span on each invocation of an async generator.
+template
+AsyncGenerator PropagateSpanThroughAsyncGenerator(
+ AsyncGenerator wrapped,
+ opentelemetry::nostd::shared_ptr span) {
+ return [=]() mutable -> Future {
+ auto scope = GetTracer()->WithActiveSpan(span);
+ return wrapped();
+ };
+}
+
+/// \brief Activate the given span on each invocation of an async generator.
+template
+AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) {
+ auto span = GetTracer()->GetCurrentSpan();
+ if (!span->GetContext().IsValid()) return wrapped;
+ return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span));
+}
#endif
} // namespace tracing
From d73e4ad552bb8fc389268f9f450af8db5bf55d7b Mon Sep 17 00:00:00 2001
From: David Li
Date: Mon, 20 Dec 2021 09:30:34 -0500
Subject: [PATCH 5/5] ARROW-15044: [C++] Use Then()
---
cpp/src/arrow/util/tracing_internal.h | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h
index 2b75ead7324..083398332d1 100644
--- a/cpp/src/arrow/util/tracing_internal.h
+++ b/cpp/src/arrow/util/tracing_internal.h
@@ -120,14 +120,15 @@ AsyncGenerator TieSpanToAsyncGenerator(
opentelemetry::nostd::shared_ptr span) {
return [=]() mutable -> Future {
auto scope = GetTracer()->WithActiveSpan(span);
- auto fut = wrapped();
- fut.AddCallback([span](const Result& result) {
- if (!result.ok() || IsIterationEnd(*result)) {
- MarkSpan(result.status(), span.get());
- span->End();
- }
- });
- return fut;
+ return wrapped().Then(
+ [span](const T& result) -> Result {
+ span->SetStatus(opentelemetry::trace::StatusCode::kOk);
+ return result;
+ },
+ [span](const Status& status) -> Result {
+ MarkSpan(status, span.get());
+ return status;
+ });
};
}