Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/build-support/lsan-suppressions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@
leak:*__new_exitfn*
# Leak at shutdown in OpenSSL
leak:CRYPTO_zalloc

# OpenTelemetry. These seem like false positives and go away if the
# CPU thread pool is manually shut down before exit.
# Note that ASan has trouble backtracing these and may not be able to
# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100
leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack
leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize
9 changes: 8 additions & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "arrow/util/map.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/variant.h"

namespace arrow {
Expand Down Expand Up @@ -159,7 +160,13 @@ Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
}
std::shared_ptr<State> state;
};
return Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
RecordBatchGenerator generator =
Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
#ifdef ARROW_WITH_OPENTELEMETRY
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), "arrow::dataset::FileFormat::ScanBatchesAsync::Next");
#endif
return generator;
}

Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
Expand Down
32 changes: 26 additions & 6 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/utf8.h"

namespace arrow {
Expand Down Expand Up @@ -148,9 +149,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
Comment on lines +152 to +155
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we push this ifdef into StartSpan by returning a dummy span object with no-op methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I didn't want to wrap too much of the API, also, I figured this would be best if people were very concerned about overhead.

ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the GetReadOptions call could fail and it would bail out of the method without marking the span as finished. I'm not sure there is any easy and good solution though since we aren't using exceptions. Any ideas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span will be marked as finished in its destructor by default. Explicit marking is only required when you want to control the end time: https://github.com/open-telemetry/opentelemetry-cpp/blob/f20f72f3a904b215fc750b67b206f158aeb61241/sdk/src/trace/span.cc#L89-L92

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense!


ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
auto path = source.path();
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(), std::move(input)));
Expand All @@ -171,11 +177,20 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
}));
return reader_fut.Then(
// Adds the filename to the error
[](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> { return reader; },
[source](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
return err.WithMessage("Could not open CSV input source '", source.path(),
"': ", err);
[=](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(err, span.get());
span->End();
#endif
return err.WithMessage("Could not open CSV input source '", path, "': ", err);
});
}

Expand Down Expand Up @@ -276,7 +291,12 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
#ifdef ARROW_WITH_OPENTELEMETRY
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
#endif
return generator;
}

Future<util::optional<int64_t>> CsvFileFormat::CountRows(
Expand Down
41 changes: 34 additions & 7 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -62,16 +63,31 @@ static inline Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsync(
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto path = source.path();
return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
.Then([](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> { return reader; },
[path](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
.Then(
[=](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(status, span.get());
span->End();
#endif
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
}

static inline Result<std::vector<int>> GetIncludedFields(
Expand Down Expand Up @@ -209,6 +225,10 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto parent_span = tracer->GetCurrentSpan();
#endif
auto self = shared_from_this();
auto source = file->source();
auto open_reader = OpenReaderAsync(source);
Expand Down Expand Up @@ -239,6 +259,13 @@ Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
/*coalesce=*/false, options->io_context));
}
#ifdef ARROW_WITH_OPENTELEMETRY
opentelemetry::trace::StartSpanOptions span_options;
span_options.parent = parent_span->GetContext();
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), std::move(span_options),
"arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next");
#endif
auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level);
return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size);
};
Expand Down
25 changes: 21 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
Expand Down Expand Up @@ -360,6 +361,10 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
Expand Down Expand Up @@ -392,10 +397,17 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
[=](const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(status, span.get());
span->End();
#endif
return WrapSourceError(status, path);
});
}
Expand Down Expand Up @@ -498,8 +510,13 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
::arrow::internal::GetCpuThreadPool(), row_group_readahead));
return generator;
};
return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
#ifdef ARROW_WITH_OPENTELEMETRY
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
#endif
return generator;
}

Future<util::optional<int64_t>> ParquetFileFormat::CountRows(
Expand Down
32 changes: 28 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
#include "arrow/dataset/scanner_internal.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -453,6 +455,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this<AsyncSc
Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
const Enumerated<std::shared_ptr<Fragment>>& fragment,
const std::shared_ptr<ScanOptions>& options) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan(
"arrow::dataset::FragmentToBatches",
{
{"arrow.dataset.fragment", fragment.value->ToString()},
{"arrow.dataset.fragment.index", fragment.index},
{"arrow.dataset.fragment.last", fragment.last},
{"arrow.dataset.fragment.type_name", fragment.value->type_name()},
});
auto scope = tracer->WithActiveSpan(span);
#endif
ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options));
ArrayVector columns;
for (const auto& field : options->dataset_schema->fields()) {
Expand All @@ -461,6 +475,10 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
MakeArrayOfNull(field->type(), /*length=*/0, options->pool));
columns.push_back(std::move(array));
}
#ifdef ARROW_WITH_OPENTELEMETRY
batch_gen =
arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span);
#endif
batch_gen = MakeDefaultIfEmptyGenerator(
std::move(batch_gen),
RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns)));
Expand All @@ -477,10 +495,16 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
return MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
auto batch_gen_gen =
MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
#ifdef ARROW_WITH_OPENTELEMETRY
batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(
std::move(batch_gen_gen));
#endif
return batch_gen_gen;
}

const FieldVector kAugmentedFields{
Expand Down
55 changes: 54 additions & 1 deletion cpp/src/arrow/util/tracing_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ Iterator<T> WrapIterator(

template <typename T>
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
opentelemetry::trace::StartSpanOptions options,
const std::string& span_name) {
return [=]() mutable -> Future<T> {
auto span = GetTracer()->StartSpan(span_name);
auto span = GetTracer()->StartSpan(span_name, {}, options);
auto scope = GetTracer()->WithActiveSpan(span);
auto fut = wrapped();
fut.AddCallback([span](const Result<T>& result) {
Expand All @@ -97,6 +98,58 @@ AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
return fut;
};
}

/// \brief Start a new span for each invocation of a generator.
///
/// The parent span of the new span will be the currently active span
/// (if any) as of when WrapAsyncGenerator was itself called.
template <typename T>
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
const std::string& span_name) {
opentelemetry::trace::StartSpanOptions options;
options.parent = GetTracer()->GetCurrentSpan()->GetContext();
return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name);
}

/// \brief End the given span when the given async generator ends.
///
/// The span will be made the active span each time the generator is called.
template <typename T>
AsyncGenerator<T> TieSpanToAsyncGenerator(
AsyncGenerator<T> wrapped,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
return [=]() mutable -> Future<T> {
auto scope = GetTracer()->WithActiveSpan(span);
return wrapped().Then(
[span](const T& result) -> Result<T> {
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
return result;
},
[span](const Status& status) -> Result<T> {
MarkSpan(status, span.get());
return status;
});
};
}

/// \brief Activate the given span on each invocation of an async generator.
template <typename T>
AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(
AsyncGenerator<T> wrapped,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
return [=]() mutable -> Future<T> {
auto scope = GetTracer()->WithActiveSpan(span);
return wrapped();
};
}
Comment on lines +136 to +144
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second glance this helper method seems a little off to me. Is the "active span" a thread local concept? Will this work even if wrapped() launches its task on a separate thread (e.g. an I/O operation)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, active span is thread local. wrapped() must manually propagate the active span (or manually pass the span through) if it itself spawns a thread. That is a disadvantage and it does make it hard to use OpenTelemetry while making it possible to completely remove it at compile time.

One way to get around this would be to instrument the Executor and possibly Future classes themselves, but I worry this would have more overhead than is desirable. (Or maybe not. I haven't tried.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concretely I'm thinking of...

#ifdef ARROW_WITH_OPENTELEMETRY
  batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(
      std::move(batch_gen_gen));
#endif

If you are I/O bound then I would expect batch_gen_gen will be transferring to an I/O thread (and back) for every item. There are "async-local" concepts (e.g. https://docs.microsoft.com/en-us/dotnet/api/system.threading.asynclocal-1?view=net-6.0) so maybe we need to adopt something like that. I think that's the same thing as "instrumenting the executor and possibly future classes themselves". I think it would be fairly affordable (submitting a thread task would have to copy a handle to the active span or "async context" to include as part of the task and then the first thing in the task would be setting the active span based on that handle).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's essentially the same (OpenTelemetry maintains a context which is thread-local by default, I think it can even be swapped out depending on how we want to go about things?). I'll try to take a look at this approach when I get a chance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be done in a follow-up? I think, at the moment, the consequence would be that spans don't have proper parentage but other than that it should be fairly harmless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can do that. If it pans out we can hopefully replace the manual instrumentation done here. I'll file a JIRA to explore this further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// \brief Activate the given span on each invocation of an async generator.
template <typename T>
AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(AsyncGenerator<T> wrapped) {
auto span = GetTracer()->GetCurrentSpan();
if (!span->GetContext().IsValid()) return wrapped;
return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span));
}
#endif

} // namespace tracing
Expand Down
18 changes: 18 additions & 0 deletions cpp/valgrind.supp
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,21 @@
...
fun:*re2*Prog*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:*opentelemetry*ThreadLocalContextStorage*GetStack*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:_dl_allocate_tls
}