-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-15067: [C++] Add tracing spans to the scanner #12609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the original PR was done before we had the nice utilities in tracing_internal.h so things here can be cleaned up using those helpers, IMO. But otherwise things look good.
|
|
||
| # OpenTelemetry. These seem like false positives and go away if the | ||
| # CPU thread pool is manually shut down before exit. | ||
| # Note that ASan has trouble backtracing these and may not be able to | ||
| # without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100 | ||
| leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack | ||
| leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably test and ensure we don't get issues anymore once the new version of OTel comes out.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this. I think this is probably more accurate than what we had before but I do think we should probably address a more general solution too. This is getting pretty confusing.
I'm not sure it would require changes to Future but rather to Executor. Here is what I'm thinking (though I'm usually missing some key detail on these kinds of things 😆 )
- When a span is created it is made the active span automatically (perhaps there can be an exception where we don't want this but I'm not aware of one)
- When a span is created the parent should always be set to the active span
- In
Executor::SpawnandExecutor::Submitwe should wrap tasks. For example:
template <typename Function>
Status Spawn(Function&& func) {
const auto& span = GetActiveSpan();
struct CallWrapper {
void operator()() {
SetActiveSpan(span);
func();
ClearActiveSpan();
}
Function func;
Span span;
};
CallWrapper wrapped{std::forward(func), span};
return SpawnReal(TaskHints{}, std::move(wrapped), StopToken::Unstoppable(),
StopCallback{});
}
| #define GET_CURRENT_SPAN(span) \ | ||
| auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() | ||
|
|
||
| #define SET_SPAN_SCOPE(scope, span) \ | ||
| auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) | ||
|
|
||
| #define TIE_SPAN_TO_GENERATOR(generator) \ | ||
| generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(generator) | ||
|
|
||
| #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ | ||
| generator = \ | ||
| ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(generator) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These macros seem more like aliases. I'm not sure they help more than they obfuscate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Their purpose is to have blank versions when compiling without opentelemetry
#ifdef ARROW_WITH_OPENTELEMETRY
#define GET_CURRENT_SPAN(span) \
auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
...
#else
#define START_SPAN(target_span, ...)
So if I remove them, I need to guard all of the calls to these functions with #ifdef ARROW_WITH_OPENTELEMETRY
cpp/src/parquet/arrow/reader.cc
Outdated
| auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan(); | ||
| ::arrow::util::tracing::Span childspan; | ||
| ::arrow::util::tracing::Span parentspan; | ||
| parentspan.Set(::arrow::util::tracing::Span::Impl{span}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a confusing way to create a parent span. Why does GetCurrentSpan() not return something we can use? Why do we need parentspan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again, it seems superfluous. I'll remove it and just start a span the normal way. That should already take care of setting the current span as the parent automatically.
cpp/src/parquet/arrow/reader.cc
Outdated
| if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready); | ||
| return ready.Then([=]() -> ::arrow::Future<RecordBatchGenerator> { | ||
|
|
||
| GET_CURRENT_SPAN(span); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a confusing use of macros. At the very least I'd want to see something like GET_CURRENT_SPAN(&span)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that would be better, but then the span object declaration is not guarded with the #ifdef ARROW_WITH_OPENTELEMETRY (that was the reason I did it like this). But I'll try it and see if it works.
| [=](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable | ||
| -> Result<RecordBatchGenerator> { | ||
| #ifdef ARROW_WITH_OPENTELEMETRY | ||
| auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? What call is accessing this scope? By the time we get to line 427 I think this scope has been destroyed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The span is automatically ended when the scope is destroyed, so that is what we want. I tried to create a generic wrapper that can wrap this lambda like we do with the AsyncGenerators etc, but I couldn't manage to figure it out so I dropped it for now. Do you think such an approach is possible?
|
Ah, I was thinking we could also have a Future carry a Span that it would activate when running callbacks/continuations. But yes, we would also want to instrument Executor as well. (And maybe instead of putting Span on Future, it should be attached to the callback itself.) |
This could work as well. So every time we add a callback we (while adding the callback) grab the active span. We then wrap the callback function like... (although you'd have to do an actual struct wrapper because I think that's basically the same thing as what I was suggesting on the thread pool but it would only affect futures. Given most of our thread tasks are futures I think this would probably be acceptable but I think I'd still lean towards implementing this in the executor if I were going to tackle it myself. |
|
With the thread pool, if the future isn't complete, then we don't call into the thread pool/spawn any tasks right? So it won't have a chance to capture the correct current span? |
|
If no thread task is spawned then the thread local storage of the current span should be sufficient. |
|
Sorry, I might be misunderstanding, but what I mean is: if we have That may not run the callback immediately, right? And/or, we have no idea what thread the callback will get run on in general (it's up to the future, or possibly options). So instrumenting the executor isn't enough, we have to instrument the future or the callback itself. |
|
Ah, I guess if you instrument the executor, the resulting future will presumably be in the right context…but to me, from looking just at local code, that seems like a less obvious property (and what if there's a generator or something else in the chain?) |
|
Ah, yes, good point. I had kind of forgotten that task submission was deferred. The callback will inherit the active span from the completing thread and not the thread that called Then/AddCallback. Ok, I agree that capturing the span in Then/AddCallback is more intuitive. |
|
I pushed some code trying to do what Weston suggested, but it doesn't compile. The (first!) problem is apparently that when including |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks
|
(By the way: please mark this ready for review if you think it's good) |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much better, thank you for taking the time to figure it out.
cpp/src/arrow/dataset/file_csv.cc
Outdated
| ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); | ||
|
|
||
| ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); | ||
| auto path = source.path(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto path = source.path(); | |
| const auto& path = source.path(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| const std::string& span_name) { | ||
| return [=]() mutable -> Future<T> { | ||
| auto span = GetTracer()->StartSpan(span_name); | ||
| auto span = GetTracer()->StartSpan(span_name, {}, options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do StartSpan(span_name, options)? If not, then let's do StartSpan(span_name, /*attributes=*/{}, options)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate
| const std::string& span_name) { | ||
| return [=]() mutable -> Future<T> { | ||
| auto span = GetTracer()->StartSpan(span_name); | ||
| auto span = GetTracer()->StartSpan(span_name, {}, options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do StartSpan(span_name, options)? If not, then let's do StartSpan(span_name, /*attributes=*/{}, options)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored this a little bit in f32fda8; options has been removed here
| AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped, | ||
| opentelemetry::trace::StartSpanOptions options, | ||
| const std::string& span_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's not obvious to me at a glance that I'd be bypassing the normal process of assigning a parent by using this overload. I'm not certain we want to do this but if the overload was...
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped, const std::string& span_name, nostd::variant<SpanContext, opentelemetry::context::Context> parent = SpanContext::GetInvalid())
It would be more obvious to me:
- Why someone would supply options
- By supplying options, I am bypassing the normal process of assigning the parent
Do we call this overload anywhere? If not, can we get rid of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate
| AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped, | ||
| opentelemetry::trace::StartSpanOptions options, | ||
| const std::string& span_name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's not obvious to me at a glance that I'd be bypassing the normal process of assigning a parent by using this overload. I'm not certain we want to do this but if the overload was...
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped, const std::string& span_name, nostd::variant<SpanContext, opentelemetry::context::Context> parent = SpanContext::GetInvalid())
It would be more obvious to me:
- Why someone would supply options
- By supplying options, I am bypassing the normal process of assigning the parent
Do we call this overload anywhere? If not, can we get rid of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored this a little bit in f32fda8, it doesn't pass options anymore, instead it just gets the current span in the function itself.
| if (!span->GetContext().IsValid()) return wrapped; | ||
| return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The distinction between PropagateSpanThroughAsyncGenerator, TieSpanToAsyncGenerator, and WrapAsyncGenerator are subtle. Would it be possible to add a short block of comment prose above these methods describing "why" you pick one or the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed; they are so very similar that I've merged 2 of them into 1. The behavior is then switched with an argument. I'd love to hear your opinion on that: 71f6989
I've also written an addition comment block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks much better, thank you. So if I understand correctly:
Wrap -> The generator "is the activity"
Propagate -> The generator "is a part of a larger activity"
| #define GET_CURRENT_SPAN(lhs) \ | ||
| lhs = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan() | ||
|
|
||
| #define SET_SPAN_SCOPE(lhs, span) \ | ||
| lhs = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(span) | ||
|
|
||
| #define TIE_SPAN_TO_GENERATOR(generator) \ | ||
| generator = ::arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(generator)) | ||
|
|
||
| #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ | ||
| generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ | ||
| std::move(generator)) | ||
|
|
||
| #define WRAP_ASYNC_GENERATOR(generator, name) \ | ||
| generator = ::arrow::internal::tracing::WrapAsyncGenerator(std::move(generator), name) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we gain making these macros instead of static methods? If the namespaces are troublesome then lets just put the methods in the arrow namespace. That seems a better compromise than introducing a macro.
namespace arrow {
nostd::shared_ptr<Span> GetCurrentSpan() {
return ::arrow::internal::tracing::GetTracer()->GetCurrentSpan();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was discussed earlier; this was my response:
Their purpose is to have blank versions when compiling without opentelemetry
#ifdef ARROW_WITH_OPENTELEMETRY
#define GET_CURRENT_SPAN(span) \
auto span = ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()
...
#else
#define START_SPAN(target_span, ...)
So if I remove them, I need to guard all of the calls to these functions with #ifdef ARROW_WITH_OPENTELEMETRY
cpp/src/arrow/dataset/file_ipc.cc
Outdated
| #ifdef ARROW_WITH_OPENTELEMETRY | ||
| opentelemetry::trace::StartSpanOptions span_options; | ||
| span_options.parent = parent_span->GetContext(); | ||
| generator = arrow::internal::tracing::WrapAsyncGenerator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do I need to manually specify the parent here? Can we add a comment explaining?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be unnecessary - I've removed it
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating all of this. I'll have to get back to profiling soon so I can play with the results.
| if (!span->GetContext().IsValid()) return wrapped; | ||
| return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks much better, thank you. So if I understand correctly:
Wrap -> The generator "is the activity"
Propagate -> The generator "is a part of a larger activity"
| return st; \ | ||
| }) | ||
|
|
||
| #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I accept your rationale for these macros but could we add a brief comment with that rationale above these one-liner macros? Otherwise I'm afraid I'll end up asking you yet again 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate -> The generator "is a part of a larger activity"
Yes, in this case, the span is not ended when the generator finishes. It is just needed so that any spans created in the asynchronously running code (possibly in other threads) do not end up becoming orphan spans in a separate trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment here 567fa30
|
@joosthooz @westonpace is there anything left to do here? (I see one thing listed in the PR description, was that already checked?) |
|
I've checked parquet and feather, still need to check csv but that didn't work from arrowbench. I'll try to get some test up and running for it. Besides that I'm happy with the current state. |
|
Ok, sounds good. We can keep this open until you get a chance to test CSV |
|
I just ran all the ctests with the otlp trace exporter enabled, and had a look at the traces for parquet, feather, and csv. As far as I can tell from looking at a couple of traces, they seem to behave consistent. |
|
Merged, thanks for carrying this through! |
|
Benchmark runs are scheduled for baseline = 542158f and contender = 8ea2c93. 8ea2c93 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Continuing #12328 and #11964.
The tracing spans were not propagated through all the asynchronous constructs, causing some spans to become disconnected from the trace. This PR aims to address this.
Some things left to do:
read_columnspan#ifdefsFutureExecutorFragmentToBatchesspan seems to be active way too long